Logstash的安装和Elasticsearch的整合

logstash

是常见的ELK组合的成员之一.
E:elasticsearch
L:logstash
K:kibana(比head强大在数据分析)

在这里插入图片描述
logstash是负责收集数据在给elasticsearch做索引使用,logstash可以收集 数据库、文件等多种数据源的数据。

1 上传解压(jdk运行环境)

在这里插入图片描述

2 运行测试(bin目录下)

#logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}’
这行命令的意思就是收集控制台打印内容,并且转化成固定格式的字符串在控制台输出
在这里插入图片描述
最终我们得到一个这样的结果,说明logstash运行成功

3 从MySql数据库抽取数据内容

连接mysql数据库。在其中创建一个product表,随便录入点数据,方便一会测试使用

4 实现数据库数据导入索引(ik分词器使用)

product表格的数据导入es集群中 index02 product

安装jdbc input插件logstash-input-jdbc的包
logstash连接数据库需要插件的支持。此处使用logstash-input-jdbc-4.2.4.tar.gz
上传插件,到logstash根目录下的plugins文件夹(如果没有手动创建)
在这里插入图片描述
上传包解压到当前目录
在这里插入图片描述

5 准备mysql连接jar包支持

linux服务器上具备这个jar包,就可以使用logstash从mysql导入数据到es;
mysql-connector-java-5.1.38-bin.jar
jar位置无要求,一会做配置文件的时候会指定jar的路径,

5.1 jdbc.conf :logstash启动时加载的文件

需要创建3个文件,来配置logstash,3个文件名可以自定义
1、jdbc.sql 查询出来需要保存到索引的数据

select * from product

2、template.json
修改"analyzer": “ik_max_word”
这里指定使用的分词器为我们安装在Elasticsearch的ik分词器,这个在我的其他文章中已经安装好了

{
    "template": "*",
    "version": 50001,
    "settings": {
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "_default_": {
            "_all": {
                "enabled": true,
                "norms": false
            },
            "dynamic_templates": [
                {
                    "message_field": {
                        "path_match": "message",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false
                        }
                    }
                },
                {
                    "string_fields": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false,
                            "analyzer": "ik_max_word",
                            "fields": {
                                "keyword": {
                                    "type": "keyword"
                                }
                            }
                        }
                    }
                }
            ],
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "include_in_all": false
                },
                "@version": {
                    "type": "keyword",
                    "include_in_all": false
                }
            }
        }
    }
}

3、jdbc.conf
导入的数据源指定,导入的属性指定(jar包,sql语句执行等) input标签
需要修改的位置有:
连接mysql地址和库,配置为刚刚创建的product表所在的库

jdbc_connection_string => "jdbc:mysql:///easydb"

登录名密码

jdbc_user => "root"
  jdbc_password => "root"

刚刚上传的mysql-connector的jar包,确定路径

jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar"

连接数据库执行的sql语句,可以在文件中设定搜索,更新等

statement_filepath => "/home/resources/jdbc.sql"

设定监听事件间隔schedule的值为cron表达式,可自行百度一下
type为自定义的类型,此处直接使用表名

schedule => "* * * * *"
  type => "product"

es的连接地址

hosts => "192.168.1.55:9200"

我之前创建好的索引

index => "index02"

使用搜出出来的那个字段设定docid,我们使用product_id,这个为数据库中表里面的ID字段

document_id => "%{product_id}"

刚刚创建的第2个配置文件

template => "/home/resources/template.json"
input {
    stdin {
    }
    jdbc {
      # 连接mysql地址和库
      jdbc_connection_string => "jdbc:mysql:///easydb"
      # 登录名密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 上传的mysql-connector的jar包,确定路径
      jdbc_driver_library => "/home/resources/mysql-connector-java-5.1.38-bin.jar"
      # 驱动类全路径名称
      jdbc_driver_class => "com.mysql.jdbc.Driver"
        # 开启分页,和分页最大值,自定义设置
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
        #连接数据库执行的sql语句,可以在文件中设定搜索,更新等
      statement_filepath => "/home/resources/jdbc.sql"
        #设定监听事件间隔
      schedule => "* * * * *"
      type => "product"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
        #设定收集数据的输出位置
    elasticsearch {
        #es的连接地址
        hosts => "192.168.1.55:9200"
        #新建的索引
        index => "index02"
        #使用搜出出来的那个字段设定docid,我们使用product_id
        document_id => "%{product_id}"
		template_overwrite => true
        template => "/home/resources/template.json"
    }
    stdout {
        codec => json_lines
    }
}

创建好三个文件后。启动logstash

6 启动logstash 指定配置文件jdbc.conf

#logstash -f jdbc.conf
我的jdbc.conf 在logstash的bin目录下,如果在其他目录此处需要带上路径。例如/home/jdbc.conf
在这里插入图片描述
已经启动成功。稍等一会。会根据cron表达式进行执行。我配置的为 schedule => “* * * * *”
所以稍等一会就会打印执行日志

这个工具是可以给es实时更新数据的,。我们可以在数据库测试添加和修改数据测试