若是你对 使用Logstash保持Elasticsearch与数据库同步 方案还不是很熟悉,建议先花点时间精读它。
上面的文章以单表同步场景为例,清楚讲述了如何经过JDBC同步数据至ES,而对于实际开发中常常出现的多表关联同步并未说起,如下是我针对多表关联同步的趟坑过程但愿对你有所帮助。
同步单表时咱们对于表字段的约定:java
多表关联的状况下咱们须要JOIN其余表查询获得结果,这个结果就是ES须要的打平后的宽表。ES新的版本中也增长了join操做,但这事不是ES擅长的,咱们选择交给更擅长的数据库处理,让ES只存储打平后的单层索引。mysql
若是你理解单表同步而困惑多表关联同步的话,试着将关联查询的复杂SQL想象(定义)为视图,是否是后续操做就跟单表没区别了!git
咱们来逐个看下多表关联的同步问题 (假设表a多对多关联表b):github
单表的id字段绑定到ES document的_id,能够实现ES索引幂等性,不会出现job缘由致使索引文档重复。那对于多表关联的状况呢,可使用各表id的组合做为document的_id。如SELECT:sql
concat(a.id, '_', b.id) AS docid
(若是你不关注幂等,也能够用_id默认生成策略。)数据库
单表基于modification_time就能够识别出自上次轮询后新的变化数据,多表关联的状况呢也相似:segmentfault
(CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time
同理软删除字段is_deleted的处理逻辑:ruby
(CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted
这样不管表a仍是表b发生变动,均可以被logstash识别出来采集到。框架
如此咱们就能够写出多表关联同步的SQL了,为了方便更新维护SQL及保持logstash-jdbc端conf配置文件的简洁,你能够把SQL定义成一张视图,conf文件中的SQL statement能够像写单表处理同样了。elasticsearch
示例conf:
input { jdbc { jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC" jdbc_user => "usr" jdbc_password => "pwd" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } } filter { mutate { copy => { "docid" => "[@metadata][_id]"} remove_field => ["docid", "@version", "unix_ts_in_secs"] } } output { elasticsearch { index => "test_idx" document_id => "%{[@metadata][_id]}" } }