随着业务复杂度的提高以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障咱们也会将重要的服务模块进行集群部署,经过负载均衡进行服务的调用。那么随着节点的增多,各个服务的日志也会散落在各个服务器上。这对于咱们进行日志分析带来了巨大的挑战,总不能一台一台的登陆去下载日志吧。那么咱们须要一种收集日志的工具将散落在各个服务器节点上的日志收集起来,进行统一的查询及管理统计。那么ELK就能够作到这一点。html
ELK是ElasticSearch+Logstash+Kibana的简称,在这里我分别对如上几个组件作个简单的介绍:java
Elasticsearch
是一个高度可扩展的开源全文搜索和分析引擎。它容许您快速、实时地存储、搜索和分析大量数据。它一般用做底层引擎/技术,为具备复杂搜索特性和需求的应用程序提供动力。咱们能够借助如ElasticSearch
完成诸如搜索,日志收集,反向搜索,智能分析等功能。ES设计的目标:git
Elasticsearch
是一个实时搜索平台。这意味着,从索引文档到可搜索文档,存在轻微的延迟(一般为一秒)。github
集群是一个或多个节点(服务器)的集合,这些节点(服务器)一块儿保存整个数据,并提供跨全部节点的联合索引和搜索功能。集群由一个唯一的名称来标识,默认状况下该名称为“elasticsearch”。这个名称很重要,由于节点只能是集群的一部分,若是节点被设置为经过其名称加入集群的话。确保不要在不一样的环境中重用相同的集群名称,不然可能会致使节点加入错误的集群。例如,您可使用logging-dev、logging-test和logging-prod开发、测试和生产集群。spring
节点是单个服务器,它是集群的一部分,它用来存储数据,并参与集群的索引和搜索功能。与集群同样,节点的名称默认为在启动时分配给节点的随机唯一标识符(UUID)。若是不须要默认值,能够定义任何节点名称。这个名称对于管理很是重要,由于您想要肯定网络中的哪些服务器对应于Elasticsearch
集群中的哪些节点。apache
索引是具备相似特征的文档的集合。例如,您能够有一个客户数据索引、另外一个产品目录索引和另外一个订单数据索引。索引由一个名称标识(必须是小写的),该名称用于在对其中的文档执行索引、搜索、更新和删除操做时引用索引。在单个集群中,能够定义任意数量的索引。json
文档是能够创建索引的基本信息单元。例如,能够为单个客户提供一个文档,为单个产品提供一个文档,为单个订单提供另外一个文档。这个文档用JSON (JavaScript对象符号)表示。在索引中,能够存储任意数量的文档。请注意,尽管文档在物理上驻留在索引中,但实际上文档必须被索引/分配到索引中的类型中。bootstrap
Logstash
是一个开源数据收集引擎,具备实时流水线功能。Logstash
能够动态地未来自不一样数据源的数据统一块儿来,并将数据规范化后(经过Filter过滤)传输到您选择的目标。
浏览器
在这里inputs表明数据的输入通道,你们能够简单理解为来源。常见的能够从kafka,FileBeat, DB等获取日志数据,这些数据通过fliter过滤后(好比说:日志过滤,json格式解析等)经过outputs传输到指定的位置进行存储(Elasticsearch,Mogodb,Redis等)springboot
简单的实例:
cd logstash-6.4.1 bin/logstash -e 'input { stdin { } } output { stdout {} }'
kibana是用于Elasticsearch检索数据的开源分析和可视化平台。咱们可使用Kibana搜索、查看或者与存储在Elasticsearch索引中的数据交互。同时也能够轻松地执行高级数据分析并在各类图表、表和映射中可视化数据。基于浏览器的Kibana界面使您可以快速建立和共享动态仪表板,实时显示对Elasticsearch查询的更改。
用户经过java应用程序的Slf4j写入日志,SpringBoot默认使用的是logback。咱们经过实现自定义的Appender将日志写入kafka,同时logstash经过input插件操做kafka订阅其对应的主题。当有日志输出后被kafka的客户端logstash所收集,通过相关过滤操做后将日志写入Elasticsearch,此时用户能够经过kibana获取elasticsearch中的日志信息
在SpringBoot当中,咱们能够经过logback-srping.xml来扩展logback的配置。不过咱们在此以前应当先添加logback对kafka的依赖,代码以下:
compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1'
添加好依赖以后咱们须要在类路径下建立logback-spring.xml
的配置文件并作以下配置(添加kafka的Appender):
<configuration> <!-- springProfile用于指定当前激活的环境,若是spring.profile.active的值是哪一个,就会激活对应节点下的配置 --> <springProfile name="default"> <!-- configuration to be enabled when the "staging" profile is active --> <springProperty scope="context" name="module" source="spring.application.name" defaultValue="undefinded"/> <!-- 该节点会读取Environment中配置的值,在这里咱们读取application.yml中的值 --> <springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers" defaultValue="localhost:9092"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> <pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern> </encoder> </appender> <!-- kafka的appender配置 --> <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder> <pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern> </encoder> <topic>logger-channel</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/> <!-- Optional parameter to use a fixed partition --> <!-- <partition>0</partition> --> <!-- Optional parameter to include log timestamps into the kafka message --> <!-- <appendTimestamp>true</appendTimestamp> --> <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --> <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --> <!-- bootstrap.servers is the only mandatory producerConfig --> <producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig> <!-- 若是kafka不可用则输出到控制台 --> <appender-ref ref="STDOUT"/> </appender> <!-- 指定项目中的logger --> <logger name="org.springframework.test" level="INFO" > <appender-ref ref="kafka" /> </logger> <root level="info"> <appender-ref ref="STDOUT" /> </root> </springProfile> </configuration>
在这里面咱们主要注意如下几点:
ElasticSearch
须要jdk8,官方建议咱们使用JDK的版本为1.8.0_131,原文以下:
Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131
检查完毕后,咱们能够分别在官网下载对应的组件
首先进入启动zookeeper的根目录下,将conf目录下的zoo_sample.cfg文件拷贝一份从新命名为zoo.cfg
mv zoo_sample.cfg zoo.cfg
配置文件以下:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=../zookeeper-data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
紧接着咱们进入bin目录启动zookeeper:
./zkServer.sh start
在kafka根目录下运行以下命令启动kafka:
./bin/kafka-server-start.sh config/server.properties
启动完毕后咱们须要建立一个logger-channel主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel
进入logstash跟目录下的config目录,咱们将logstash-sample.conf
的配置文件拷贝到根目录下从新命名为core.conf
,而后咱们打开配置文件进行编辑:
```ruby
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input { kafka { id => "my_plugin_id" bootstrap_servers => "localhost:9092" topics => ["logger-channel"] auto_offset_reset => "latest" } } filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{WORD:module} \| %{LOGBACKTIME:timestamp} \| %{LOGLEVEL:level} \| %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" } } } output { stdout { codec => rubydebug } elasticsearch { hosts =>["localhost:9200"] } } ```
咱们分别配置logstash的input,filter和output(懂ruby的童鞋们确定对语法结构不陌生吧):
grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }
解析事后会变成:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
这些属性都会在elasticsearch
中存为对应的属性字段。更详细的介绍请参考官网:grok ,固然该插件已经帮咱们定义好了好多种核心规则,咱们能够在这里查看全部的规则。
另外咱们在patterns文件夹中建立好自定义的规则文件logback,内容以下:
# yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})
编辑好配置后咱们运行以下命令启动logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic
该命令会实时更新配置文件而不需启动
启动ElasticSearch很简单,咱们能够运行以下命令:
./bin/elasticsearch
咱们能够发送get请求来判断启动成功:
GET http://localhost:9200
咱们能够获得相似于以下的结果:
{ "name" : "Cp8oag6", "cluster_name" : "elasticsearch", "cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA", "version" : { "number" : "6.4.0", "build_flavor" : "default", "build_type" : "zip", "build_hash" : "f27399d", "build_date" : "2016-03-30T09:51:41.449Z", "build_snapshot" : false, "lucene_version" : "7.4.0", "minimum_wire_compatibility_version" : "1.2.3", "minimum_index_compatibility_version" : "1.2.3" }, "tagline" : "You Know, for Search" }
咱们能够在github上下载elasticsearch的IK分词器,地址以下:ik分词器,而后把它解压至your-es-root/plugins/ik的目录下,咱们能够在{conf}/analysis-ik/config/IKAnalyzer.cfg.xml
or {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml
里配置自定义分词器:
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <!--用户能够在这里配置本身的扩展字典 --> <entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry> <!--用户能够在这里配置本身的扩展中止词字典--> <entry key="ext_stopwords">custom/ext_stopword.dic</entry> <!--用户能够在这里配置远程扩展字典 --> <entry key="remote_ext_dict">location</entry> <!--用户能够在这里配置远程扩展中止词字典--> <entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry> </properties>
首先咱们添加索引:
curl -XPUT http://localhost:9200/my_index
咱们能够把经过put请求来添加索引映射:
PUT my_index { "mappings": { "doc": { "properties": { "title": { "type": "text" }, "name": { "type": "text" }, "age": { "type": "integer" }, "created": { "type": "date", "format": "strict_date_optional_time||epoch_millis" } "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } } } }
其中doc是映射名 my_index是索引名称
logstash
默认状况下会在ES中创建logstash-*
的索引,*表明了yyyy-MM-dd
的时间格式,根据上述logstash
配置filter的示例,其会在ES中创建module ,logmessage,class,level等索引。(具体咱们能够根据grok插件进行配置)
在kibana的bin目录下运行./kibana便可启动。启动以后咱们能够经过浏览器访问http://localhost:5601 来访问kibanaUI。咱们能够看到以下界面: