写在前面css
从初次了解elastic产品到正式投入使用,拖拖拉拉的也有小半年了,刚接触的时候看到一些帖子都是安装教程,后来看到一些都是深刻教程,此篇文章较居中一点,总结了我在踩的一些坑和记录一些周边插件的使用方式、方法,便于本身后续回顾,也但愿能给新用户一些引导,少走一些弯路;核心实际上是想表达一下对rockybean和KennyW的爱,这期间很是感谢两位的协助,在非工做日深夜排查问题屡次,正文多处采用二位给予的讲解,万分感谢。Rsyslog配置(双打Kafka)
现有的版本是0.8,而刚开始测试的用logstash5.x须要kafka0.10(最终hangout替换logstash),因此新搭建了一组新的集群,Rsyslog向两个Kafka集群分别写数据,配置以下html
Module (load="imudp")
Module (load="omkafka")
Input (type="imudp" port="514")
Module (load="mmsequence")
$MaxMessageSize 4k
local5.none /var/log/messages
local5.none @log.domain.com:514
set $!newmsg = replace($msg,'\\x','\\u00')
template(name="kafka_topic" type="string" string="%programname%")
template(name="kafka_msg" type="string" string="%!newmsg%")
if ($syslogfacility-text == 'local5' and $syslogseverity-text == 'info') then{
action(type="omkafka" topic="kafka_topic" partitions.auto="on"
dynatopic="on" dynatopic.cachesize="1000"
confParam=["compression.codec=snappy"]
#kafka broker addr
broker=["10.10.10.1:9092","10.10.10.2:9092",]
template="kafka_msg"
errorfile="/var/log/omkafka/log_kafka_failures.log")
action(type="omkafka" topic="kafka_topic" partitions.auto="on"
dynatopic="on" dynatopic.cachesize="1000"
confParam=["compression.codec=snappy"]
#kafka broker addr
broker=["20.20.20.1:9092","20.20.20.2:9092",]
template="kafka_msg"
errorfile="/var/log/omkafka/log_kafka_failures.log")
stop
}
复制代码
配置Nginx JSON格式日志java
log_format json_format '{"@timestamp":"$time_iso8601",'
'"cookie_id":"$cookie_id",' #内部cookie_id
'"client_ip":"$remote_addr",'
'"remote_user":"$remote_user",'
'"request_method":"$request_method",'
'"domain":"$host",'
'"user_agent":"$http_user_agent",'
'"xff":"$http_x_forwarded_for",'
'"upstream_addr":"$upstream_addr",'
'"upstream_response_time":"$upstream_response_time",'
'"request_time":"$request_time",'
'"size":"$body_bytes_sent",'
'"idc_tag":"tjtx",'
'"cluster":"$host_pass",'
'"status":"$status",'
'"upstream_status":"$upstream_status",'
'"host":"$hostname",'
'"via":"$http_via",'
'"protocol":"$scheme",'
'"request_uri":"$request_uri",'
'"http_referer":"$http_referer"}';复制代码
Nginx内置syslog模块配置,而且引用刚刚定义的json日志格式node
access_log syslog:local5:info:127.0.0.1:514:nginx_aggregation_log json_format;
#nginx_aggregation_log 这是自定义的Topic复制代码
NginxSyslog模块介绍
linux
注:
1) UDP传输虽快,可是以太网(Ethernet)数据帧的长度必须在46-1500字节之间,UDP不能像TCP重组数据包,去除IP和UDP的数据包,最终可以使用只剩1472字节。若是传输大于这个长度的消息,并不会想UDP自己同样直接丢弃,只是会损坏源数据格式,截断超过限制字节之外的数据;
2) 对于Nginx日志来讲,只要不保留POST数据,基本一条消息不会超过限制字节,我在NginxSyslog介绍中没看到支持TCP,用lua脚本实现的TCP方式传输,可是看了不少帖子都不建议在Nginx中用TCP日志传输。就是由于TCP传输可靠,但像网络抖动、传输异常,可能会不停的重试屡次或等待,直接影响这条请求,也直接影响到了用户;
3) 消息超过了UDP传输限制怎么办,我这目前是保留一条消息的重要字段,如上述的json_format的格式,将 request_uri、http_referer等可能会较大的字段放到最后,若是真的发现消息不完整,直接丢弃http_referer,取request_uri问号前的内容;(在logstash或hangout中filters实现,具体配置详见下文Hangout-filters)ios
Kafka监控插件:kafka-monitor 和 kafka-managernginx
注:
目前咱们这kakfa集群是kafka_2.10-0.8.1.1版本,可是logstash5.x对kafka有版本要求>0.10版本。后来采用hangout,更换了几个jar包解决了此问题git
模仿logstash作的一个应用,功能没有logstash多,可是基本使用都有了,java编写,性能能够翻好几倍,用到的功能就是从kafka订阅消息,作一些简单的过滤,而后写入ES;目前hangout部署到2台服务器上,每一个进程开8G内存,CPU在60-70左右;github
inputs:
- Kafka:
topic:
nginx_aggregation_log: 32
codec:
json
consumer_settings:
group.id: es-nginx_aggregation_log
zookeeper.connect: "10.10.10.1:2181,10.10.10.2:2181"
auto.commit.interval.ms: "20000"
socket.receive.buffer.bytes: "1048576"
fetch.message.max.bytes: "1048576"
num.consumer.fetchers: "1"
filters:
- Filters:
if:
- '<#if message??>true</#if>'
#若是不是完整的JSON,会出现message,则走此逻辑
filters:
- Grok:
match:
- '(?<msg>{"@timestamp":.*"request_uri":([^\?]+)\?)'
#正则匹配@timestamp开始到request_uri后边的第一个?截止
- Gsub:
fields:
msg: ['$','"}']
#补全符号,完整新的JSON格式
- Json:
field: msg
remove_fields: ['message']
#干掉错误的数据
- Convert:
fields:
request_time:
to: float
remove_if_fail: true
upstream_response_time:
to: float
remove_if_fail: true
size:
to: integer
remove_if_fail: true
- GeoIP2:
source: client_ip
database: '/opt/soft/hangout/etc/other/GeoLite2-City.mmdb'
- Json:
field: geoip
- Remove:
fields:
- msg
- Add:
fields:
request_url: '<#assign a=request_uri?split("?")>${a[0]}'
#request_uri这个term的cardinality很高,因此?前用于聚合,原有的用于搜索
if:
- '<#if request_uri??>true</#if>'
outputs:
- Elasticsearch:
cluster: es-nginx
timezone: "Asia/Shanghai"
hosts: "10.10.10.1:9300,10.10.10.2:9300"
index: 'hangout-nginx_aggregation_log-%{+YYYY.MM.dd}'
复制代码
topic: nginx_aggregation_log: 32,不管是logstash仍是hangout都有这个概念,这个32表明须要创建多少子线程去kafka读取数据,数量最好与Partition相等,若是少于Partition,会一个线程同时去2个Partition读取消息,若大于Partition则会有不工做的进程web
CPU:32C,内存:128G ,硬盘:STAT 6T * 12,网卡:万兆
复制代码
【系统】: Centos7 内核3.10
【JDK】: 1.8.0_66/31G (听说此版本JDK有BUG,请安装最新JDK)
【系统参数修改1】: vm.swappiness=1 [下降对硬盘的缓存]
【系统参数修改2】: vm.max_map_count=262144 [Elasticsearch针对各类文件使用NioFS和MMapFS的混合。以便有足够的虚拟内存可用于mmapped文件] 复制代码
cluster.name: es-nginx
node.name: 10.10.10.1
#为后期冷热数据使用
node.attr.rack_id: hdd
path.data: /data
path.logs: /opt/logs/elasticsearch/
network.host: 0.0.0.0
http.port: 9200
#设置新节点被启动时可以发现的主节点列表
discovery.zen.ping.unicast.hosts: ["10.10.10.1","10.10.10.2","10.10.10.3"]
#防止脑裂(n/2+1)
discovery.zen.minimum_master_nodes: 2
node.master: true
node.data: false复制代码
刚刚开始测试ES的第一个版本是ES5.3,先搞了3台机器,每一个机器一个节点,配置是master和data共同提供服务,高可用架构集群搭建完成,可是写入性能特别差,cpu使用在20-30%,少许io.wait,下图是当时3w左右的性能图当时以为既然ES硬件很空闲必定是logstash出问题了,查看logstash确实有很严重的Full GC,开始从2台服务器扩至4台服务器,后来发现无果,期间各类调整ES的shard的数量都没效果,又怀疑kafka性能,从二、四、六、8...64分区依旧无果。当时这个坑可爬了一段时间,后来在Google的游荡中无心中看到帖子说,不要将master和data都启用,而后我照着作了改变,master单点,data两台,问题搞定,效果图找不到了,起码翻倍是有的;
[Master除了网卡,其余没什么消耗]
复制代码
因shard数量、字段类型、其余设置等都是都是在建立时生成,因此要提早建立好相应的模板,便于规范管理和引用,下面针对shard和aliases作的一些设置,以下:
{
"template": "agg-nginx-*",
"aliases": {
"agg-nginx": {}
},
"settings": {
"number_of_shards": 4,
"number_of_replicas": 1,
"index.routing.allocation.include.rack_id": "ssd"
}复制代码
经过上述配置PUT到 _template/ur_name下在分片上的定义已经成功,可是像agg-nginx-
和test-agg-test-这样的2个索引名字,即便你建立了另外一个"template": "agg-nginx-test-*"的模板依旧都匹配第一个,固然换名字最简单有效,在template的order的是专门解决这个问题的。默认建立"order": "0",值越高优先级越高,因此在想要先匹配的将order值调高便可
ES的mapping很是相似于静态语言中的数据类型:声明一个变量为int类型的变量, 之后这个变量都只能存储int类型的数据。一样的, 一个number类型的mapping字段只能存储number类型的数据。同语言的数据类型相比,mapping还有一些其余的含义,mapping不只告诉ES一个field中是什么类型的值, 它还告诉ES如何索引数据以及数据是否能被搜索到
下列是一个删减版的mapping复制代码
"mappings": {
"ngx_log": {
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date"
},
"client_ip": {
"type": "ip"
},
"domain": {
"type": "keyword"
},
"geoip": {
"properties": {
"city_name": {
"type": "keyword"
},
"country_name": {
"type": "keyword"
},
"latitude": {
"type": "float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "float"
},
}
},
"request_time": {
"type": "float"
},
"request_url": {
"type": "keyword"
},
"status": {
"type": "keyword"
ype": "keyword" }, } } }复制代码
_all字段
该_all字段是一个特殊的catch-all字段,它将全部其余字段的值链接成一个大字符串,使用空格做为分隔符,而后对其进行分析和索引,但不存储。也就是说它能被查询,但不能被取回显示。由于Nginx每一个Key对应的value都是提早定义好的,因此不用全文查询,不须要开启_all字段,另外也节省了一半的存储空间
默认的text类型上边这英文有点多,其实简单理解就是不分词,你就最好别用text了,并且Text类型也会相应的多占用空间,依照上述,数据主要是日志分析,每条数据的格式已经很明确,主要用于日志分析,因此不须要分词。像一些全部引擎的业务更适合须要分词;
[摘取部分苏若年博客内容]
1)分片算法:
shard = hash(routing) % number_of_primary_shards
routing值是一个任意字符串,它默认是_id但也能够自定义,这个routing字符串经过哈希函数生成一个数字,而后除以主切片的数量获得一个余数(remainder),余数的范围永远是0到number_of_primary_shards - 1,这个数字就是特定文档所在的分片。
这也解释了为何主切片的数量只能在建立索引时定义且不能修改:若是主切片的数量在将来改变了,全部先前的路由值就失效了,文档也就永远找不到了。
全部的文档API(get、index、delete、bulk、update、mget)都接收一个routing参数,它用来自定义文档到分片的映射。自定义路由值能够确保全部相关文档.好比用户的文章,按照用户帐号路由,就能够实现属于同一用户的文档被保存在同一分片上。
2)分片与副本交互:
新建、索引和删除请求都是写(write)操做,它们必须在主分片上成功完成才能复制到相关的复制分片上,下面咱们罗列在主分片和复制分片上成功新建、索引或删除一个文档必要的顺序步骤:
一、客户端给Node 1发送新建、索引或删除请求。
二、节点使用文档的_id肯定文档属于分片0。它转发请求到Node 3,分片0位于这个节点上。
三、Node 3在主分片上执行请求,若是成功,它转发请求到相应的位于Node 1和Node 2的复制节点上。当全部的复制节点报告成功,Node 3报告成功到请求的节点,请求的节点再报告给客户端。
客户端接收到成功响应的时候,文档的修改已经被应用于主分片和全部的复制分片。你的修改生效了。
一个索引要分多少片?何时该扩容?
取决于硬件和你对响应速度的要求,通常来讲一个shard的数据量控制在一、2千万的级别,速度都还好,过亿会比较缓慢。 可是任何事物都有两面,shard划分比较小,必然数量就比较多。 在用户作一、2天数据搜索的时候可能还好,若是搜更长时间的数据,一次搜索须要并行搜索的shard就比较多。若是节点数量有限,就会比较吃力,须要扩充更多的节点
听说是优化之王道,常常拿城市举的例子,好比说我想看下网站的北京pv是多少,若是按照默认hash逻辑,必定要全shard扫描,而后聚合结果,可是若是提早设置好routing,好比说指定城市字段作hash计算,routing值同样的放到特定几个分片,这样查起来也不须要全shard扫了;这样弊端就是会形成shard大小不均,因此routing优化须要花一些功夫来观察、测试;目前kibana还不支持routing查询,因此目前在kibana上尚未使用routing,这是优化的重点因此先记录下来。后续个人想法是,像nginx日志的域名的字段都是英文字母,针对首字母作下routing,当想看某一个域名时不在全盘扫,查询优化会有明显效果,后续有结果在与你们分享;
另外hangout开始对routing的支持,后来在GitHub提了一个小issue,很快就加上了,点个赞;
Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一块儿使用的。你能够用kibana搜索、查看、交互存放在Elasticsearch索引里的数据,使用各类不一样的图表、表格、地图等kibana可以很轻易地展现高级数据分析与可视化。
先介绍下几块功能:
(╯﹏╰)吐槽一下,用SF写到如今,Chrome都快无法用了,打完字一会才显示出来
上图中环比图是Timelion完成的,其余都是Visualize的功能
Timelion语法:
.es(index=index1,timefield=@timestamp).label('Today').title(QPS).color(#1E90FF),
.es(offset=-24h,index=index2,timefield=@timestamp).label('Yesterday').lines(fill=1,width=0.5).color(gray)复制代码
核心问题:查询慢、查询15Min数据都超时
mapping字段优化:
主要像上述的mapping介绍,作类型优化,不分词的keyword,数学计算的改为整型or浮点等
status这个字段的类型,value通常都是200、30一、30二、40四、502,最多估计也就几百个,像这样的字段就不适合作long,long类型的索引是为范围查找优化的,用的是二叉树这样的索引,适合值范围比较大的字段,好比body_size,可能最大值和最小值相差不少,而用keyword索引是倒排,只用存放一个全部status的词典,因此keyword更适合
shard数量调整:
开始主分片数量是每台机器1个,副本1(20shard x 1replicas),每一个shard已经达到将近200G,看这个量级已经超过官方建议值30~50G之间(具体要根据实际状况测试为准)。因而开始将数量翻倍40shard * 1replicas,调整后查询并无明显改善,对写入没有什么改变,继续double,依旧没效并且分片越多,写入的时候消耗的CPU就越高
若是过分分配shard,就增大了Lucene在合并分片查询结果时的复杂度,从而增大了耗时,在新建索引时,更是一笔大的开销。
通常来讲,刚开始的时候尽可能少分片为佳, 只有到一个分片上数据太多,单次查询太慢在考虑加分片。
加分片也有负面做用,提升了并发量,随之开销就会越大,更多的shard通常也伴随更多的segment文件。若是说节点数量没变,每一个节点上会有更多的小文件,搜索的时候并发量是高了,前提是在磁盘io和cpu都还能应付的过来的时候,速度才会快
拆分索引
更改shard已经得不到显著效果,因而从拆索引下手。数据都是Nginx日志,从源头拆分没什么好的方法,因而从hangout这层开始作处理,很简单就是将域名的第一个字母取出来,写入相应的索引时候带过去,例如:nginx-log-{首字母},这样一拆,一下建立26个索引(shard20 * 1replicas),CPU立马load 30+,负载直接上来,而后ES数据还跟不上,拒绝了不少内容,以下图,最终仍是无果而了结此方案
上述的索引拆分是比较傻瓜式,首先已知的问题就是可能A开头的域名很大,其余很小就是很不均匀
回归HDD
通过SSD的折腾,查到了kibanaBug问题,其实SSD并未发现其余的性能亮点,通过与KennyW的交流,他们本身有作过在SSD和多块HDD的盘对比,写入量并没有明显提高,因此在磁盘并非真正的瓶颈,查询方面SSD明显提升。可是SSD的高额付出换来那么几秒钟的意义不大。相对,对一些公司的搜索业务,数据量级小,还有像一些监控业务,要求实时性很是高,对SSD是很好的选择,速度快,容量不须要太多,也比较经济实惠
[KennyW经验指导]
当作复杂计算,bucket不少的时候主要消耗的是CPU和内存,磁盘并非瓶颈,hdd和ssd在咱们实际用下来,感受在大数据场景分别不大,ssd优点在大量的随机磁盘io,ES自己作了不少优化,让数据变成顺序的磁盘访问,并且搜索过的数据块,ES都能利用文件系统缓存加速,因此即便使用hdd,可能第一次搜索磁盘访问会带来额外的几秒耗时,但屡次执行同一个搜索时,后面几回几乎没什么磁盘io开销,速度会有明显提高
收尾
作到目前这些调整,如今一天小于1T的索引任何查询都没有问题,查询在10s左右返回全部数据(上图kibana展现),可是那个一天4-5T的索引仍是有问题,一查询IO就跑满,到30s直接超时了,先说下IO跑满的问题吧,问题是request_uri台过于分散,聚合出现问题如图:15分钟就出现了2000多万不一样值,若是长时间计算,不可想象的恐怖
【KennyW指导】
request_uri 会产生大量的磁盘IO。 ES作terms聚合的时候,为了节省内存,不会将全部term的内容直接读出来作bucket,由于有些term的内容可能很长,会比较耗费内存。 因此他会借助一种叫作oridinals的数据结构, 这种数据结构相似这样
1 abc
2 efg
3 hfa
.............一个该字段全部不一样值的顺序列表。 作分桶聚合的时候,只须要拿这个顺序数字作key就能够了,等聚合出结果,再经过这个key查ordinals表,获得实际的key值填充结果。 可是这个ordinals是随着segment 文件生成的,每一个segment文件的ordinals顺序可能不同。 所以在聚合的时候,须要一个全局的global ordinals。 这个数据结构是在聚合的时候,实时生成后缓存在内存里的。 若是某个字段的不一样值很是多,计算价值很是的昂贵,那么这个构造过程就很是的缓慢,除了大量的磁盘IO消耗,还会有很大的内存消耗。
下图是关掉这个有问题的visualize先后对比图,虽然不快,可是latency降了不少
后来对uri的?后边的参数所有丢弃,像这样的问题只能减小而后作聚合使用,原有数据作搜索使用,可是因为数据太大,作复杂计算仍是会超时30s,这种状况只能是下降team的cardinality,或者加分片、加机器或者拆索引了,因此对kibana的超时时间作了一点调整,还有一些周边小的修改以下
1)kibana默认的30s超时改为2min,kibana.yml中修改elasticsearch.requestTimeout: 120000
2)kibana默认地图使用高德,kibana.yml中新增tilemap.url: 'http://webrd02.is.autonavi.com/appmaptile?lang=zh_cn&size=1&scale=1&style=7&x={x}&y={y}&z={z}'
3)结合cerebro插件,高效管理索引
到这里基本的一期项目算是结束,虽然部分查询并无迅速返回,可是基本数据均可以展现,后续会关注几个点继续深刻优化和调整,有结果在与你们分享
首先在这里仍是先要感谢rockybean和KennyW的大力支持。
对本身的总结就是,对ES经验太少,踩了不少没必要要的坑,另外就是没有好好统一阅读下官网文档,这样极其影响效率与进度,好多时候也会一筹莫展,不知从何下手。
第一次写技术帖,并且时间稍紧,有哪些地方写的很差或敏感烦请指出,我会及时更正。但愿这篇文章,能给予像我这样的小白用户少踩一点坑,多一点爱。文章有点长,你们以为做者总结的还能够,能够关注一下做者的公众号《Java技术zhai》,公众号聊的不只仅是Java技术知识,还有面试等干货,后期还有大量架构干货。你们一块儿关注吧!关注技术zhai,你会了解的更多..............