是常见的ELK组合的成员之一.
E:elasticsearch
L:logstash
K:kibana(比head强大在数据分析)
logstash是负责收集数据在给elasticsearch做索引使用,logstash可以收集 数据库、文件等多种数据源的数据。
#logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}’
这行命令的意思就是收集控制台打印内容,并且转化成固定格式的字符串在控制台输出
最终我们得到一个这样的结果,说明logstash运行成功
连接mysql数据库。在其中创建一个product表,随便录入点数据,方便一会测试使用
product表格的数据导入es集群中 index02 product
安装jdbc input插件logstash-input-jdbc的包
logstash连接数据库需要插件的支持。此处使用logstash-input-jdbc-4.2.4.tar.gz
上传插件,到logstash根目录下的plugins文件夹(如果没有手动创建)
上传包解压到当前目录
linux服务器上具备这个jar包,就可以使用logstash从mysql导入数据到es;
mysql-connector-java-5.1.38-bin.jar
jar位置无要求,一会做配置文件的时候会指定jar的路径,
需要创建3个文件,来配置logstash,3个文件名可以自定义
1、jdbc.sql 查询出来需要保存到索引的数据
select * from product
2、template.json
修改"analyzer": “ik_max_word”
这里指定使用的分词器为我们安装在Elasticsearch的ik分词器,这个在我的其他文章中已经安装好了
{ "template": "*", "version": 50001, "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { "_all": { "enabled": true, "norms": false }, "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "analyzer": "ik_max_word", "fields": { "keyword": { "type": "keyword" } } } } } ], "properties": { "@timestamp": { "type": "date", "include_in_all": false }, "@version": { "type": "keyword", "include_in_all": false } } } } }
3、jdbc.conf
导入的数据源指定,导入的属性指定(jar包,sql语句执行等) input标签
需要修改的位置有:
连接mysql地址和库,配置为刚刚创建的product表所在的库
jdbc_connection_string => "jdbc:mysql:///easydb"
登录名密码
jdbc_user => "root" jdbc_password => "root"
刚刚上传的mysql-connector的jar包,确定路径
jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar"
连接数据库执行的sql语句,可以在文件中设定搜索,更新等
statement_filepath => "/home/resources/jdbc.sql"
设定监听事件间隔schedule的值为cron表达式,可自行百度一下
type为自定义的类型,此处直接使用表名
schedule => "* * * * *" type => "product"
es的连接地址
hosts => "192.168.1.55:9200"
我之前创建好的索引
index => "index02"
使用搜出出来的那个字段设定docid,我们使用product_id,这个为数据库中表里面的ID字段
document_id => "%{product_id}"
刚刚创建的第2个配置文件
template => "/home/resources/template.json"
input { stdin { } jdbc { # 连接mysql地址和库 jdbc_connection_string => "jdbc:mysql:///easydb" # 登录名密码 jdbc_user => "root" jdbc_password => "root" # 上传的mysql-connector的jar包,确定路径 jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar" # 驱动类全路径名称 jdbc_driver_class => "com.mysql.jdbc.Driver" # 开启分页,和分页最大值,自定义设置 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #连接数据库执行的sql语句,可以在文件中设定搜索,更新等 statement_filepath => "/home/resources/jdbc.sql" #设定监听事件间隔 schedule => "* * * * *" type => "product" } } filter { json { source => "message" remove_field => ["message"] } } output { #设定收集数据的输出位置 elasticsearch { #es的连接地址 hosts => "192.168.1.55:9200" #新建的索引 index => "index02" #使用搜出出来的那个字段设定docid,我们使用product_id document_id => "%{product_id}" template_overwrite => true template => "/home/resources/template.json" } stdout { codec => json_lines } }
创建好三个文件后。启动logstash
#logstash -f jdbc.conf
我的jdbc.conf 在logstash的bin目录下,如果在其他目录此处需要带上路径。例如/home/jdbc.conf
已经启动成功。稍等一会。会根据cron表达式进行执行。我配置的为 schedule => “* * * * *”
所以稍等一会就会打印执行日志
这个工具是可以给es实时更新数据的,。我们可以在数据库测试添加和修改数据测试