ELK Tips 主要介绍一些 ELK 使用过程当中的小技巧,内容主要来源为 Elastic 中文社区。java
pipeline.workers
:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,能够考虑增长该参数的大小;pipeline.batch.size
:设置单个工做线程在执行过滤器和输出以前收集的最大事件数,较大的批量大小一般更高效,但会增长内存开销。输出插件会将每一个批处理做为一个输出单元。;例如,ES 输出会为收到的每一个批次发出批量请求;调整 pipeline.batch.size
可调整发送到 ES 的批量请求(Bulk)的大小;pipeline.batch.delay
:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工做线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来讲,当 pipeline.batch.size
不知足时,会等待 pipeline.batch.delay
设置的时间,超时后便开始执行 filter 和 output 操做。当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:node
Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);
复制代码
当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。nginx
当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。数据库
upstream /data/ {
server 192.168.187.xxx:9200;
keepalive 300 ;
}
server {
listen 80;
server_name testelk.xx.com;
keepalive_timeout 120s 120s;
location /data {
proxy_pass http://data/;
proxy_http_version 1.1;
proxy_set_header Connection "Keep-Alive";
proxy_set_header Proxy-Connection "Keep-Alive";
proxy_set_header X-Real-IP $remote_addr;
proxy_pass_header remote_user
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Host $http_host;
proxy_set_header X-Nginx-Proxy true;
}
}
复制代码
ES 的 reindex 在索引有实时的 update/delete 的状况下,即便借助 alias,也没有办法实现真正的 zero down time。编程
增长新文档比较好办,经过 alias 切换写入到新索引,同时 reindex 作旧->新索引的数据传输便可;可是 update/delete 操做针对的文档若是还未从旧索引传输过来,直接对新索引操做会致使两个索引数据不一致。后端
我可以想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个相似 last_update_time
字段记录文档最后更新的时间,用做写入 ES 文档的版本号,而后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx
。bash
其中 version_type=external_gt
表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,不然会抛版本冲突的异常。另外 delete 操做都要转换成 index 操做,index 的内容能够是一个空文档。app
这样实时数据写入新索引和 reindex 能够同时进行,实时写入的数据应该具备更高的版本,老是可以成功,reindex 若是遇到版本冲突,说明该文档被实时部分更新过了,已通过时,能够直接放弃跳过。curl
该方案的缺陷:elasticsearch
重建索引步骤以下:
old_index
(业务上的别名仍是挂在老索引上)进行重索引操做(version_type=external
);curl -X POST 'http://<hostname>:9200/_reindex'
{
"conflicts": "proceed",
"source": {
"index": "old_index",
"size": 1000
},
"dest": {
"index": "new_index",
"version_type": "external"
}
}
复制代码
old_index
产生的热数据,再捞一次到 new_index
中(conflicts=proceed&version_type=external
);curl -X POST /_reindex
{
"conflicts": "proceed",
"source": {
"index": "old_index"
"query": {
"constant_score" : {
"filter" : {
"range" : {
"data_update_time" : {
"gte" : <reindex开始时刻前的毫秒时间戳>
}
}
}
}
}
},
"dest": {
"index": "new_index",
"version_type": "external"
}
}
复制代码
这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过咱们能够视具体业务状况灵活操做。好比说数据量比较大重索引咱们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,咱们能够按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几回,直到别名切完后,咱们能保证最后一次的步骤(4)能够在较短期内完成。
http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1
复制代码
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy,
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q);
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();
复制代码
ES 文档默认不容许文档字段超过 1000,超过 1000 会报以下错误:
failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded
复制代码
能够经过修改索引配置来修改字段个数限制,不过仍是推荐从业务上进行优化:
修改settings
{
"index.mapping.total_fields.limit": 2000
}
复制代码
## wrapper 案例
GET /_search
{
"query" : {
"wrapper": {
"query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0="
}
}
}
## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")
复制代码
重启机器以后,pagecache 都没有了,全部数据都要从新从磁盘加载。
PUT _cluster/settings
{
"persistent": {
"logger.cluster.service": "DEBUG"
}
}
复制代码
PUT _template/global-slowlog_template
{
"order": -1,
"version": 0,
"template": "*",
"settings": {
"index.indexing.slowlog.threshold.index.debug" : "10ms",
"index.indexing.slowlog.threshold.index.info" : "50ms",
"index.indexing.slowlog.threshold.index.warn" : "100ms",
"index.search.slowlog.threshold.fetch.debug" : "100ms",
"index.search.slowlog.threshold.fetch.info" : "200ms",
"index.search.slowlog.threshold.fetch.warn" : "500ms",
"index.search.slowlog.threshold.query.debug" : "100ms",
"index.search.slowlog.threshold.query.info" : "200ms",
"index.search.slowlog.threshold.query.warn" : "1s"
}
}
复制代码
transport.tcp.port
这个参数不写,默认为9300-9399
,开放那么多 端口有用么?
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
复制代码
能够用 TSVB,支持标注。
Kibana TSVB 注解的使用:elasticsearch.cn/article/701
请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV
Any Code,Code Any!
扫码关注『AnyCode』,编程路上,一块儿前行。