前面一篇已经把logstash和logstash-input-jdbc安装好了。html
下面就说下具体怎么配置。java
1.先在安装目录bin下面(通常都是在bin下面)新建两个文件jdbc.conf和jdbc.sqlmysql
2.配置jdbc.confsql
1 input { 2 stdin { 3 } 4 jdbc { 5 # 链接的数据库地址和哪个数据库,指定编码格式,禁用SSL协议,设定自动重连 6 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true" 7 jdbc_user => "root" 8 jdbc_password => "123456" 9 # 下载链接数据库的驱动包,建议使用绝对地址 10 jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar" 11 12 jdbc_driver_class => "com.mysql.jdbc.Driver" 13 jdbc_paging_enabled => "true" 14 jdbc_page_size => "50000" 15 codec => plain { charset => "UTF-8"} 16 17 #使用其它字段追踪,而不是用时间 18 #use_column_value => true //这里若是是用时间追踪好比:数据的更新时间或建立时间等和时间有关的这里必定不能是true, 切记切记切记,我是用update_time来追踪的 19 #追踪的字段 20 tracking_column => update_time 21 record_last_run => true 22 #上一个sql_last_value值的存放文件路径, 必需要在文件中指定字段的初始值 这里说是必须指定初始值,我没指定默认是1970-01-01 08:00:00 23 last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" //这里的lastrun文件夹和.logstash_jdbc_last_run是本身建立的 24 25 jdbc_default_timezone => "Asia/Shanghai" //设置时区 26 #statement => SELECT * FROM goods WHERE update_time > :last_sql_value //这里要说明一下若是直接写sql语句,前面这种写法确定不对的 27 ,加上引号也试过也不对,因此我直接写在jdbc.sql文件中 28 statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql" 29 30 31 #是否清除 last_run_metadata_path 的记录,若是为真那么每次都至关于从头开始查询全部的数据库记录 32 clean_run => false 33 34 # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次 35 schedule => "* * * * *" 36 type => "std" 37 } 38 } 39 40 filter { 41 42 json { 43 44 source => "message" 45 46 remove_field => ["message"] 47 48 } 49 50 } 51 52 output { 53 54 elasticsearch { 55 56 # 要导入到的Elasticsearch所在的主机 57 58 hosts => "127.0.0.1:9200" 59 60 # 要导入到的Elasticsearch的索引的名称 61 62 index => "goods" 63 64 # 类型名称(相似数据库表名) 65 66 document_type => "spu" 67 68 # 主键名称(相似数据库主键) 69 70 document_id => "%{id}" 71 } 72 73 stdout { 74 75 # JSON格式输出 76 77 codec => json_lines 78 79 } 80 }
3.配置jdbc.sql数据库
1 select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value
4.咱们来看下 .logstash_jdbc_last_run文件中的内容(网上讲述该配置的时候都没讲到里面具体的内容写法,致使不少人很迷惑,其中我就是)json
前面的---具体什么意思,我也不太清楚。ruby
5.启动jdbc.conf配置,开始同步数据elasticsearch
第一次:由于时间是从1970年开始的因此会所有同步一遍。至关于全量同步了编码
从第二次开始,会从上次最新的一次时间同步,既新增和修改都会同步spa
遇到的问题:
1.ES中8小时时差的问题?
解决方法:从源头解决问题
在jdbc.conf配置文件中只要是有关时间的字段都手动+8小时
39 filter { 40 json { 41 source => "message" 42 remove_field => ["message"] 43 } 44 // date类型不能省略,否则会报错, 就是把当前字段+8小时后赋值给新的字段,而后再取新字段的值赋值给老的字段,再把新的字段删除 45 date { 46 match => ["message","UNIX_MS"] 47 target => "@timestamp" 48 } 49 ruby { 50 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 51 } 52 ruby{ 53 code => "event.set('@timestamp',event.get('timestamp'))" 54 } 55 mutate{ 56 remove_field => ["timestamp"] 57 } 58 59 date { 60 match => ["message","UNIX_MS"] 61 target => "create_time" 62 } 63 ruby { 64 code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)" 65 } 66 ruby { 67 code => "event.set('create_time',event.get('@create_time'))" 68 } 69 mutate { 70 remove_field => ["@create_time"] 71 } 72 73 date { 74 match => ["message","UNIX_MS"] 75 target => "update_time" 76 } 77 ruby { 78 code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)" 79 } 80 ruby { 81 code => "event.set('update_time',event.get('@update_time'))" 82 } 83 mutate { 84 remove_field => ["@update_time"] 85 }
86 }
总结:主要是配置,有什么问题,先检查配置文件。