1.前言
记录使用logstash从sqlserver同步数据到ES中遇到的几点问题。使用的版本是es6.8.3+logstash6.8.3html
2.logstash配置文件
2.1input
input { jdbc { jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驱动jar包 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;" jdbc_user => "sa" jdbc_password => "123456" jdbc_default_timezone => "Asia/Shanghai" jdbc_paging_enabled => "true"#分页 record_last_run => true#记录上一次运行的值 use_column_value => true#使用数据库中的字段追踪 tracking_column => "update_time"#追踪的字段名称 tracking_column_type => "timestamp"#追踪的字段类型 last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次运行的值存储的文件地址 clean_run => false#使用数据库中的字段追踪 statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql语句 schedule => "*/5 * * * * *"#每5s执行一次 } }
statement
因为要查的数据是表关联的数据,一开始想的是创建多个jdbc,把数据存到es的不一样的索引中,利用父子文档进行关联查询,git
后来发现这种办法效率差,并且影响ES的性能,因此解决办法就是在sqlserver中创建好多表联查好的视图,es6
这里的statement
中的v_test就是建立好的视图.sql
因为设置了Logstash 增量更新, 必需要使用 update_time>:sql_last_value and update_time<GETDATE()
这种限制条件,这样才能够保证数据不丢失也不重复数据库
具体缘由见:如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步json
schedule
网上的不少教程都说最小间隔是1min,其实是能够作到秒级的.ruby
schedule => "*/5 * * * * *"
只要在前面再加一个* 单位就是秒,这里就是每5s执行一次app
2.2filter
filter { if ![test]{ruby{code =>'event.set("test","")'}} mutate{ convert => { "id" => "integer" } remove_field => ["@timestamp"] remove_field => ["@version"] } }
这里主要是对从sqlserver数据库查出来的数据进行一些处理,我这里删去了大多数的内容,仅保留一些表明性的.curl
if ![test]{ruby{code =>'event.set("test","")'}}
这个的意思是 test字段为null时,使用ruby这个语言进行处理,code =>''
这里面就是写代码的elasticsearch
event.set("test","")
意思就是 设置test字段的内容为""
固然咱们也能够先event.get("test")
,获取test字段的内容,而后在进行一系列处理后,再event.set
,这样就能够保存处理后的字段的值
ruby语言的具体语法能够参考这个:Ruby教程
convert => { "id" => "integer" }
这个的意思就是将id字段的类型转化为integer,若是某个字段是时间类型能够转化为timestamp类型
2.3output
output { elasticsearch { hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"] index => "consumer_statistics"#索引名称 document_id => "%{id}"#索引的id document_type => "consumer_statistics"#索引的type,这个在6.x版本之后就已经被废弃,能够忽略这个 template_name => "consumer_statistics"#索引模板的名称 } }
document_id => "%{id}"
文档的id就是导入数据的id,这样设置能够实现幂等性
template_name => "consumer_statistics"
索引模板的名称consumer_statistics
,ES会调用模板名称为consumer_statistics
建立索引.
固然前提是你得先建立好这个模板
3.索引模板的建立
-
指令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在这里输入你建立的模板'
-
模板
{ "template": "consumer_statistics", "order": 2, "version": 60001, "index_patterns": ["consumer_statistics"], "settings": { "index": { "refresh_interval": "5s", "max_result_window": "2147483647"#设置from+size的最大值 } }, "mappings": { "_default_": { "dynamic_templates": [{ "message_field": { "path_match": "message", "mapping": { "norms": false, "type": "text" }, "match_mapping_type": "string" } }, { "string_fields": { "mapping": { "norms": false, "type": "text", "fields": { "keyword": { "ignore_above": 1024,#设置不被索引的字段长度 "type": "keyword" } } }, "match_mapping_type": "string", "match": "*" } }], "properties": { "@timestamp": { "type": "date" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "latitude": { "type": "half_float" }, "location": { "type": "geo_point" }, "longitude": { "type": "half_float" } } }, "@version": { "type": "keyword" } } } }, "aliases": {} }
"max_result_window": "2147483647"
在业务处理的过程当中每每须要分页,ES的JAVA-API是经过from,size来设置分页数量和每页的数量,
在默认的状况下from+size必需要小于10000,可是若是实际需求大于10000,则必须在这里设置
我这里设置的是max_result_window
的最大值,实际状况中不须要设置如此之大,
由于ES会在内存中进行排序,若是一次返回的结果过大,可能会致使服务宕机.
"ignore_above": 1024
这里默认是256,意思是若是某一个字段的内容超过256字节的话,那么将不会被索引.
也就是说从ES中是可以看到这条数据的存在,可是若是你指定查询条件,是查不出来的.
举个例子,如今ES中有id,test两个字段,一共100条数据
test字段中只有一条数据超过了256字节,如今我查询test字段中包含"1"的数据,
即便这个超过256字节的数据含有1,可是也不会被查询到.
为了可以让他被索引到,这里将256改为1024,即只有超过1024字节才会不被索引.
- 完整命令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d ' {"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'
在建立模板的过程当中,发现老是建立失败,后来发现弄成这样的两行,就不会出错了.