美文网首页
Logstash 同步 mysql 数据到 ES

Logstash 同步 mysql 数据到 ES

作者: Kare | 来源:发表于2019-08-20 14:01 被阅读0次

以下是我的 logstash-mysql.conf 文件

input {
  jdbc {
  type => "enterprises_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprises.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprises"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_addresses"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprise_addresses.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_addresses"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_informations"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprise_informations.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_informations"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_members"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprise_members.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_members"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_order_products"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprise_order_products.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_order_products"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_orders"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprise_orders.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_orders"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_reviews"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/enterprise_reviews.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_reviews"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_tags"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprise_tags.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_tags"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "enterprise_types"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_app_feiyuown"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/enterprises/app_enterprise_types.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from app_enterprise_types"
  schedule => "* * * * *"
  }

  jdbc {
  type => "resume_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_base_resume"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/resume/resume_info.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from resume_base"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "resume_education_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_base_resume"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "own_id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/resume/resume_education_info.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from r_education"
  schedule => "* * * * *"
  }
  
  jdbc {
  type => "resume_experience_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_base_resume"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "own_id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/resume/resume_experience_info.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from r_experience"
  schedule => "* * * * *"
  }

  jdbc {
  type => "resume_project_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_base_resume"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "own_id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/resume/resume_project_info.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from r_project"
  schedule => "* * * * *"
  }

  jdbc {
  type => "resume_skill_info"
  jdbc_connection_string => "jdbc:mysql://192.168.110.253:3300/apollo_base_resume"
  jdbc_user => "work"
  jdbc_password => "work123"
  tracking_column => "own_id"
  record_last_run => "true"
  use_column_value => "true"
  last_run_metadata_path => "/data/application/logstash/config/resume/resume_skill_info.txt"
  clean_run => "false"
  jdbc_driver_library => "/data/application/logstash/mysql-connector-java-5.1.39.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  statement => "select * from r_skill"
  schedule => "* * * * *"
  }
}

filter {
mutate {
  convert => [ "publish_time", "string" ]
 }

date {
  timezone => "Europe/Berlin"
  match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"]
}

json {
  source => "message"
  remove_field => ["message"]
  }
}

output {
  stdout {
       codec => json_lines
   }
  
  if [type]=="enterprises_info" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_addresses" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }

  if [type]=="enterprise_informations" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_members" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_order_products" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_orders" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_reviews" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_tags" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }
  
  if [type]=="enterprise_types" {
    elasticsearch {
    hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
    index => "apollo_%{type}"
    document_id => "%{id}"
    document_type => "%{type}"
    }
  }

  if [type]=="resume_info" {
  elasticsearch {
  hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
  index => "apollo_%{type}"
  document_id => "%{id}"
  document_type => "%{type}"
  }
}

if [type]=="resume_education_info" {
  elasticsearch {
  hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
  index => "apollo_%{type}"
  document_id => "%{own_id}"
  document_type => "%{type}"
  }
}

if [type]=="resume_experience_info" {
  elasticsearch {
  hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
  index => "apollo_%{type}"
  document_id => "%{own_id}"
  document_type => "%{type}"
  }
}


if [type]=="resume_project_info" {
  elasticsearch {
  hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
  index => "apollo_%{type}"
  document_id => "%{own_id}"
  document_type => "%{type}"
  }
}

if [type]=="resume_skill_info" {
  elasticsearch {
  hosts => ["192.168.110.247:9200", "192.168.110.253:9200"]
  index => "apollo_666677"
  document_id => "%{own_id}"
  }
}

}

前台启动: bin/logstash -f config/logstash-mysql.conf
后台运行: nohup bin/logstash -f config/logstash-mysql.conf > /dev/null 2>&1 &

遇到的问题

  • 在实际过程中,如果数据库没有数据,在同步时,logstash 不会向 es 注入索引,所以即使脚本在跑,但是数据库没数据,es 里面也不会有东西。

  • 开始测试是在本地,版本是7.1.1,后来上到局域网服务器是 6.5.4版本,但是会一直报错。

An unexpected error occurred! {:error=>#<SystemCallError: Unknown error (SystemCallError) - <STDOUT>>, :backtrace=>["org/jruby/RubyIO.java:1457:in write'", "org/jruby/RubyIO.java:1428:inwrite'", "/data/application/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-stdout-3.1.4/lib/logstash/outputs/stdout.rb:43:in block in multi_receive_encoded'", "org/jruby/RubyArray.java:1734:ineach'", "/data/application/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-stdout-3.1.4/lib/logstash/outputs/stdout.rb:42:in multi_receive_encoded'", "/data/application/logstash/logstash-core/lib/logstash/outputs/base.rb:87:inmulti_receive'", "org/logstash/config/ir/compiler/OutputStrategyExt.java:114:in multi_receive'", "org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:97:inmulti_receive'", "/data/application/logstash/logstash-core/lib/logstash/pipeline.rb:373:in block in output_batch'", "org/jruby/RubyHash.java:1343:ineach'", "/data/application/logstash/logstash-core/lib/logstash/pipeline.rb:372:in output_batch'", "/data/application/logstash/logstash-core/lib/logstash/pipeline.rb:324:inworker_loop'", "/data/application/logstash/logstash-core/lib/logstash/pipeline.rb:286:in `block in start_workers'"]}
[2019-08-19T12:37:00,627][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

后来,我把 config 中的logstash.yml 中的配置项全部注释了,就好了。仅此记录,如果需要配置的同学,请自行更改配置。

chrome 插件,查看 ES 数据工具
ES Header

相关文章

网友评论

      本文标题:Logstash 同步 mysql 数据到 ES

      本文链接:https://www.haomeiwen.com/subject/teqpsctx.html