使用logstash同步至ES的几个坑

1.前言

记录使用logstash从sqlserver同步数据到ES中遇到的几点问题。使用的版本是es6.8.3+logstash6.8.3html

2.logstash配置文件

2.1input

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驱动jar包
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;"
        jdbc_user => "sa"
        jdbc_password => "123456"
        jdbc_default_timezone => "Asia/Shanghai"
		jdbc_paging_enabled => "true"#分页
		record_last_run => true#记录上一次运行的值
		use_column_value => true#使用数据库中的字段追踪
		tracking_column => "update_time"#追踪的字段名称
		tracking_column_type => "timestamp"#追踪的字段类型
		last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次运行的值存储的文件地址
		clean_run => false#使用数据库中的字段追踪
		statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql语句
		schedule => "*/5 * * * * *"#每5s执行一次
    }
}
  • statement

因为要查的数据是表关联的数据,一开始想的是创建多个jdbc,把数据存到es的不一样的索引中,利用父子文档进行关联查询,git

后来发现这种办法效率差,并且影响ES的性能,因此解决办法就是在sqlserver中创建好多表联查好的视图,es6

这里的statement 中的v_test就是建立好的视图.sql

因为设置了Logstash 增量更新, 必需要使用 update_time>:sql_last_value and update_time<GETDATE()这种限制条件,这样才能够保证数据不丢失也不重复数据库

具体缘由见:如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步json

  • schedule

网上的不少教程都说最小间隔是1min,其实是能够作到秒级的.ruby

schedule => "*/5 * * * * *"只要在前面再加一个* 单位就是秒,这里就是每5s执行一次app

2.2filter

filter {
	if ![test]{ruby{code =>'event.set("test","")'}}	
	mutate{
		convert => { "id" => "integer" }
		remove_field => ["@timestamp"]
		remove_field => ["@version"]
	}
}

这里主要是对从sqlserver数据库查出来的数据进行一些处理,我这里删去了大多数的内容,仅保留一些表明性的.curl

  • if ![test]{ruby{code =>'event.set("test","")'}}

这个的意思是 test字段为null时,使用ruby这个语言进行处理,code =>'' 这里面就是写代码的elasticsearch

event.set("test","")意思就是 设置test字段的内容为""

固然咱们也能够先event.get("test"),获取test字段的内容,而后在进行一系列处理后,再event.set,这样就能够保存处理后的字段的值

ruby语言的具体语法能够参考这个:Ruby教程

  • convert => { "id" => "integer" }

这个的意思就是将id字段的类型转化为integer,若是某个字段是时间类型能够转化为timestamp类型

2.3output

output {
		elasticsearch {
			hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"]
			index => "consumer_statistics"#索引名称
			document_id => "%{id}"#索引的id
			document_type => "consumer_statistics"#索引的type,这个在6.x版本之后就已经被废弃,能够忽略这个
			template_name => "consumer_statistics"#索引模板的名称
		}
}
  • document_id => "%{id}"

文档的id就是导入数据的id,这样设置能够实现幂等性

  • template_name => "consumer_statistics"

索引模板的名称consumer_statistics,ES会调用模板名称为consumer_statistics建立索引.

固然前提是你得先建立好这个模板

3.索引模板的建立

  • 指令

curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在这里输入你建立的模板'
  • 模板

{
	"template": "consumer_statistics",
	"order": 2,
	"version": 60001,
	"index_patterns": ["consumer_statistics"],
	"settings": {
		"index": {
			"refresh_interval": "5s",
			"max_result_window": "2147483647"#设置from+size的最大值
		}
	},
	"mappings": {
		"_default_": {
			"dynamic_templates": [{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			}, {
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"fields": {
							"keyword": {
								"ignore_above": 1024,#设置不被索引的字段长度
								"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}],
			"properties": {
				"@timestamp": {
					"type": "date"
				},
				"geoip": {
					"dynamic": true,
					"properties": {
						"ip": {
							"type": "ip"
						},
						"latitude": {
							"type": "half_float"
						},
						"location": {
							"type": "geo_point"
						},
						"longitude": {
							"type": "half_float"
						}
					}
				},
				"@version": {
					"type": "keyword"
				}
			}
		}
	},
	"aliases": {}
}
  • "max_result_window": "2147483647"

在业务处理的过程当中每每须要分页,ES的JAVA-API是经过from,size来设置分页数量和每页的数量,

在默认的状况下from+size必需要小于10000,可是若是实际需求大于10000,则必须在这里设置

我这里设置的是max_result_window的最大值,实际状况中不须要设置如此之大,

由于ES会在内存中进行排序,若是一次返回的结果过大,可能会致使服务宕机.

  • "ignore_above": 1024

这里默认是256,意思是若是某一个字段的内容超过256字节的话,那么将不会被索引.

也就是说从ES中是可以看到这条数据的存在,可是若是你指定查询条件,是查不出来的.

举个例子,如今ES中有id,test两个字段,一共100条数据

test字段中只有一条数据超过了256字节,如今我查询test字段中包含"1"的数据,

即便这个超过256字节的数据含有1,可是也不会被查询到.

为了可以让他被索引到,这里将256改为1024,即只有超过1024字节才会不被索引.

  • 完整命令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '
{"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'

在建立模板的过程当中,发现老是建立失败,后来发现弄成这样的两行,就不会出错了.

相关文章
相关标签/搜索