实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。
迁移至kafka是一种比较好的业务选型方案。
而mysql写入kafka的选型方案有:
方案一:logstash_output_kafka 插件。
方案二:kafka_connector。
方案三:debezium 插件。
方案四:flume。
方案五:其余相似方案。java
其中:debezium和flume是基于mysql binlog
实现的。
若是须要同步历史全量数据+实时更新数据,建议使用logstash。mysql
经常使用的logstash的插件是:logstash_input_jdbc实现关系型数据库到Elasticsearch等的同步。
实际上,核心logstash的同步原理的掌握,有助于你们理解相似的各类库之间的同步。
logstash核心原理
:输入生成事件,过滤器修改它们,输出将它们发送到其余地方。
logstash核心三部分组成:input、filter、output。
git
input { } filter { } output { }
##1.1 input输入
包含但远不限于:github
过滤器是Logstash管道中的中间处理设备。您能够将过滤器与条件组合,以便在事件知足特定条件时对其执行操做。
能够把它比做数据处理的ETL
环节。
一些有用的过滤包括:web
Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式
。有了内置于Logstash的120种模式,您极可能会找到知足您需求的模式!输出是Logstash管道的最后阶段。一些经常使用的输出包括:redis
详细的filter demo参考:https://github.com/hellosign/logstash-fundamentals/blob/master/examples/complex_logstash.mdsql
input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base" jdbc_user => "root" jdbc_password => "xxxxxxx" jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" #schedule => "* * * * *" statement => "SELECT * from news_info WHERE id > :sql_last_value order by id" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run" } } filter { ruby{ code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)" } ruby{ code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)" } mutate { remove_field => [ "@version" ] remove_field => [ "@timestamp" ] remove_field => [ "gather_time" ] remove_field => [ "publish_time" ] } } output { kafka { bootstrap_servers => "192.168.1.13:9092" codec => json_lines topic_id => "mytopic" } file { codec => json_lines path => "/tmp/output_a.log" } }
以上内容不复杂,不作细讲。
注意:
Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。数据库
code =>
“event.set(‘gather_time_unix’,event.get(‘gather_time’).to_i*1000)”,json
是将Mysql中的时间格式转化为时间戳格式。bootstrap
from星友:使用logstash同步mysql数据的,由于在jdbc.conf里面没有添加 lowercase_column_names
=> “false” 这个属性,因此logstash默认把查询结果的列明改成了小写,同步进了es,因此就致使es里面看到的字段名称全是小写。
最后总结:es是支持大写字段名称的,问题出在logstash没用好,须要在同步配置中加上 lowercase_column_names => “false” 。记录下来但愿能够帮到更多人,哈哈。
想将关系数据库的数据同步至ES中,若是在集群的多台服务器上同时启动logstash。
解读:实际项目中就是没用随机id 使用指定id做为es的_id ,指定id能够是url的md5.这样相同数据就会走更新覆盖之前数据
解读:高版本基于时间增量有优化。
tracking_column_type => "timestamp"
应该是须要指定标识为时间类型,默认为数字类型numeric
解读:能够logstash同步mysql的时候sql查询阶段处理,如:select a_value as avalue***
。
或者filter阶段处理,mutate rename
处理。
mutate { rename => ["shortHostname", "hostname" ] }
或者kafka阶段借助kafka stream处理。
新的实现:https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/
mysql2mysql:https://my.oschina.net/u/2601303/blog/1503835
推荐开源实现:https://github.com/Lunatictwo/DataX
推荐阅读:
一、实战 | canal 实现Mysql到Elasticsearch实时增量同步
二、干货 | Debezium实现Mysql到Elasticsearch高效实时同步
三、一张图理清楚关系型/非关系型数据库与Elasticsearch同步
铭毅天下——Elasticsearch基础、进阶、实战第一公众号