以前公司小王在工做中清理elasticSearch 索引,不当心使用脚本清空了最近使用的重要索引,致使开发没法准确的进行定位生产问题,形成了很大困扰。git
当时咱们的生产环境中是这样配置日志系统的:服务器 -> filebeat -> kakfa -> logstash -> elasticsearch -> kibana 页面。其中在缓存配置中咱们配置了kafka集群。github
es 数据丢失以后,咱们考虑使用拉去kafka 集群中的存储的消息,让它从新消费一边,更换消费组,从头开始进行消费 配置 auto_offset_reset => "earliest" 设定。json
具体实践以下:bootstrap
以下配置是我测试经过的缓存
input { kafka { bootstrap_servers => ["192.168.39.23:9082,192.168.39.24:9082,192.168.39.25:9082"] topics_pattern => ["MY_REPORT_TOPIC_.*"] client_id => "logstash-1" group_id => "logstash_to_elastic2" # 这里的组要更新一下,原先是logstash_to_elastic auto_offset_reset => "earliest" # 这里设定消费模式为从头开始消费,也就是从最初开始消费 consumer_threads => 1 decorate_events => true codec => "json" } } output { elasticsearch { hosts => "http://192.168.39.27:9200" index => "logtype-%{[logType]}" document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}" } }
重点说下读取kafka的配置:服务器
bootstrap_servers:kafka集群地址
topics_pattern:我这里是通配的前缀为”MY_REPORT_TOPIC_”的topic,注意后面跟的是.*,不要忘了这个点,正则的语法,通配全部字符用的
client_id:自定义的客户端id
group_id:自定义组id,两个消费者同时消费同一组会形成其中一个没法消费,可是只要建立多个partition就能够了,即组内消费者数量和分区数相同就均可以消费了,这样一个组内全部消费者可以分担负载
auto_offset_reset:
因为我有些历史数据要消费,因此设置为earliest从头读的网络
consumer_threads:通常最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,若是大于分区数会造logstash进程闲置,不然一个进程访问多个分区。若是有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数便可
数据写入到Kafka时没有重复,但后续流程可能由于网络抖动、传输失败等致使重试形成数据重复。如何解决呢?不须要依赖业务数据就能够去重。去重的原理也很简单,利用es document id便可。elasticsearch
对于es,若是写入数据时没有指定document id,就会随机生成一个uuid,若是指定了,就使用指定的值。对于须要去重的场景,咱们指定document id便可。测试
在output elasticsearch中能够经过document_id字段指定document id,咱们须要构造出一个”uuid”能唯一标识kafka中的一条数据,这个很是简单:<topic>+<partition>+<offset>
,这三个值的组合就能够惟一标识kafka集群中的一条数据。ui
input kafka插件也已经帮咱们把消息对应的元数据信息记录到了@metadata(Logstash的元数据字段,不会输出到output里面去)字段里面:
所以就有了上面的配置:
document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
decorate_events:只有当decorate_events选项配置为true的时候,上面的[@metadata](https://github.com/metadata)才会记录那些元数据,不然不会记录。而该配置项的默认值是false,即不记录