进行文章的第二次修改,包括了以前的简单方案的升级过程。php
由于业务的不断更新升级,为了保证线上业务也能正常使用elk服务,而且使得elk的服务和线业务流解耦(即避免直接写入es的方式可能会带来的耗时影响)因此咱们采用了下面最新的方案,也是常规方案html
业务层 >> kafka队列 >> logstash 消费 >> elasticsearch
业务层将日志写入到kafka队列,同事logstash能够开启多个线程,启用同一个group_id来对kafka进行消费,将读取到的日志进行解析后,写入到elasticsearch中,而且按照索引模板进行解析。java
业务层能够直接写入到kafka队列中,不用担忧elasticsearch的写入效率问题。git
比起以前的简单版本,须要保证kafka队列、logstash的高可用(虽然logstash挂掉后,能够重启后从新读取队列日志)es6
整个搭建过程,写入kafka是很是简单的,这里遇到的问题是logstash和elasticsearch索引模板带来的困扰。json
如何肯定使用谁的模板呢?
网上找到的资料,建议采用将模板配置在elasticsearch侧,这样就不用每一个logstash进行一个模板配置文件的维护。bootstrap
官网的文档kafka的input插件
https://www.elastic.co/guide/...安全
input { kafka { // 须要读取的kafka队列集群配置 bootstrap_servers => "xxx.xxx.xxx.xxx:9092" // 配置的消费者的group名称,由于同一个组内的消费消息不会重复 group_id => "logstash-group" // 主题配置 topics => "kibana_log" // 从未消费过的偏移量开始 auto_offset_reset =>"earliest" } } // 重要,下面单独讲 filter { json { source => "message" } } output { // stdout能够省略,这个是为了命令行模式下方便调试 stdout{ codec => rubydebug } elasticsearch { // es 集群 hosts=>"xxx.xxx.xxx.xxx:9200" // 重要:取消logstash自定义模板功能,进而强制使用es的内置模板 manage_template=>false // 须要匹配的模板名称 index=>"logstash-dev-%{+YYYY.MM.dd}" } }
先解释下filter配置,当咱们从kafka读取消息的时候,消息体是经过message字段来进行传递的,因此message是一个字符串,可是咱们的es索引模板可能会很是复杂,因此咱们须要对其进行json解析后,再交给es。不然es收到的以后一个message字段。ruby
filter { json { source => "message" } }
再说下模板配置,首先经过kibana的devtool向es中写入了一个模板,我区分了两套环境dev、prod。服务器
这里字段都进行了strval转义,为何呢?这和下面要讲的动态模板
有关联的。往下看
$position = YnUtil::getPosition(); $urlData = parse_url(\Wii::app()->request->url); $path = $urlData['path'] ?? ''; $params = [ 'category' => strval($category), 'appType' => strval(YnUtil::getAppType()), 'appVersion' => strval(YnUtil::getAppVersion()), 'host' => strval(\Wii::app()->request->hostInfo), 'uri' => strval(\Wii::app()->request->url), 'uid' => strval(\Wii::app()->user->getUid()), 'path' => strval($path), 'server' => strval(gethostname()), 'geoip' => [ 'ip' => strval(\Yii::$app->request->userIP), 'location' => [ 'lat' => floatval($position['latitude']), 'lon' => floatval($position['longitude']), ], ], 'userAgent' => strval(\Wii::app()->request->userAgent), 'message' => is_array($message) ? Json::encode($message) : strval($message), // '@timestamp' => intval(microtime(true) * 1000), ];
下面的模板是写入到es里面的自定义模板,为了防止索引规则名称冲突,这里将order置为1。
咱们先来看下第一个模板(这个是不推荐的,由于很繁琐,可是类型很强制有效)
PUT _template/logstash-dev { "index_patterns": "logstash-dev*", "aliases": {}, "order":1, "mappings": { // 这里使用logs是由于logstash默认的type类型 "logs": { // 动态模板 "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } }, "norms": false, "type": "text" } } } ], // 这里对属性进行了类型设置 "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "appType": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "appVersion": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "category": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "geoip": { "dynamic": "true", "properties": { "ip": { "type": "ip" }, "latitude": { "type": "half_float" }, "location": { "type": "geo_point" }, "longitude": { "type": "half_float" } } }, "host": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "message": { "type": "text", "norms": false }, "server": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "uid": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "uri": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "path": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "userAgent": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } }, "settings": { "index": { "number_of_shards": "1" } } }
上面的模板不是最优的,可是倒是一种尝试,模板里面针对每一个属性作了设置,这样客户端只须要写入对应的属性就行了。可是如何动态配置呢?若是想加入一个字段,难道还要修改模板么? 这里引入了动态模板的概念
Only the following datatypes can be automatically detected: boolean, date, double, long, object, string. It also accepts * to match all datatypes.
动态映射
https://www.elastic.co/guide/...
动态模板(注意看match和match_mapping_type)
https://www.elastic.co/guide/...
映射属性
https://www.elastic.co/guide/...
而后咱们给出了一个全新的模板
这个模板里面只针对特殊的属性进行了设置,其余的都是经过动态模板扩展的,下面看下效果。
PUT _template/logstash-dev { "index_patterns": "logstash-dev*", "aliases": {}, "order":1, "mappings": { "logs": { "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } }, "norms": false, "type": "text" } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "geoip": { "dynamic": "true", "properties": { "ip": { "type": "ip" }, "latitude": { "type": "half_float" }, "location": { "type": "geo_point" }, "longitude": { "type": "half_float" } } } } } }, "settings": { "index": { "number_of_shards": "1" } } }
咱们上面说了,全都进行了strval
转义,为何呢?由于动态模板里面,匹配的是string类型,若是咱们写入的是一个int类型,那么就不会进行自动扩展了。试验后代表,会生成一个int类型的message字段,这样是不合理的。最终生成的效果是以下图的。
分割线 =============================
新项目短期来实现日志采集。
一台8G 4核 500G硬盘 服务器
项目部署在4台服务器,每台服务器经过phpsdk直接写入一台es服务器中。
(在本次部署中,没有使用logstash的功能)
没有使用异步队列,致使直接写入es可能会影响业务逻辑,可是目前只会在开发和测试环境使用。
主要利用elasticsearch 和 kibana
要使用x-pack作安全校验,包括给kibana加入登陆受权功能
Elasticsearch
https://www.elastic.co/cn/pro...
Elasticsearch-clients
这里包含的多种语言的sdk包
https://www.elastic.co/guide/...
Kibana
https://www.elastic.co/cn/pro...
Logstash
https://www.elastic.co/cn/pro...
X-pack
安装流程说明很详细,秘钥生成后记得保存下,而且加入x-pack后,kibana和elasticsearch的通信,须要修改配置文件。另外phpsdk也须要加入秘钥,后面说明。
https://www.elastic.co/cn/pro...
es的模板超级复杂的,因此咱们要利用标准的现有的模板,须要从logstash中提取一个。
解压下载的logstash-6.1.2.tar
,执行搜索命令
$ find ./ -name 'elasticsearch-template*' ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es2x.json ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es5x.json ./vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.0.2-java/lib/logstash/outputs/elasticsearch/elasticsearch-template-es6x.json
会发现有2x 5x 6x
三个模板,这里咱们选择6x
,要根据你的es版原本选择。
而后建立索引,索引要在数据写入以前建立,由于要给每一个字段设置类型。
https://www.elastic.co/guide/...
按照文档的方式,将获取到的6x
经过curl的方式写入到es
由于默认的es配置是开启了localhost:9200
端口,用于执行RESTFUL,可是本次咱们采用php-sdk的方式,直接写入es,就要求每台业务服务器,都能访问到es。
# ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # 修改此处的host配置为0.0.0.0,这样全部的请求均可以接入进来 network.host: 0.0.0.0 # # Set a custom port for HTTP: # #http.port: 9200
下载地址
https://www.elastic.co/guide/...
try { $hosts = [ // 这个地方要填写x-pack分配的密码和用户名 'http://{用户名}:{密码}@192.168.1.11:9200', // HTTP Basic Authentication // 'http://user2:pass2@other-host.com:9200' // Different credentials on different host ]; $client = ClientBuilder::create()->setHosts($hosts)->build(); $position = YnUtil::getPosition(); $params = [ 'index' => 'logstash-yn-' . date('Ymd'), // elastic6版本有个bug,每一个索引只能有一个type类型 'type' => 'xxxx', 'id' => md5(rand(0, 999999999)) . (microtime(true) * 1000), 'body' => [ // 索引建立好后,写入数据必定要注意类型,否则会报错,我这里都会进行格式化一遍 // 类别做为一个主要字段用于区分日志 'category' => strval($category), 'appType' => strval(YnUtil::getAppType()), 'appVersion' => strval(YnUtil::getAppVersion()), 'host' => strval($hostInfo), 'uri' => strval($url), 'uid' => strval($user->getUid()), 'server' => strval(gethostname()), 'geoip' => [ 'ip' => strval($ip), // 这个很重要,能够实现geo可视化图形 'location' => [ 'lat' => floatval($position['latitude']), 'lon' => floatval($position['longitude']), ], ], 'userAgent' => strval($userAgent), 'message' => Json::encode($message), // 这里必定要写入毫秒时间戳 '@timestamp' => intval(microtime(true) * 1000), ], ]; $client->index($params); } catch (\Exception $e) { }
官方说明
https://www.elastic.co/guide/...
咱们这里以6x做为模板
{ // 将这个模板应用于全部以 logstash- 为起始的索引。 "template": "logstash-*", "version": 60001, "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { // 动态模板说明,很重要,配置了动态模板后,咱们能够添加任意字段 // https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html "dynamic_templates": [{ // 信息字段 官方说这里能够自定义 The template name can be any string value. "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { // 字符串字段说明 "string_fields": { // 匹配全部 "match": "*", // 而且字段类型是string的 "match_mapping_type": "string", // "mapping": { "type": "text", // 这里应该是和受欢迎程度评分相关 "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }], // 定义好的属性说明 "properties": { // 时间字段,这个很重要,否则kibana不会出现时间相关的查询控件 "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, // 这个能够只写入properies里面的任意一个字段 "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, // 我只是用了这个location "location": { "type": "geo_point" }, "latitude": { "type": "half_float" }, "longitude": { "type": "half_float" } } } } } } }
使用logstash内置的模板
检查是不是毫秒时间戳
须要在连接中添加用户名和密码
写入类型,必定要和索引模板中定义的一致,否则确定报错!
elasticsearch6的 bug,官方承诺在7进行修复
安装x-pack吧
使用supervisor
个人方案是:定时脚原本清理7天以前的索引DELETE logstash-xxxx
在kibana中有一个Dev Tools
能够执行curl,而且看到结果
在kibana菜单的Management->Index Patterns中能够管理
不详细的地方,能够留言