mysql准实时同步数据到Elasticsearch

4. 安装JDK八、MySQL5.6驱动以及Logstash -6.0.0

ECS中分别安装JDK八、MySQL5.6驱动以及Logstash -6.0.0。以下图:html

_

安装Logstash input、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。java

安装插件的命令分别是(在Logstash主目录下运行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
_mysql

5. MySQL中建立数据库、测试的数据表

以下图所示sql

_
建表语句(其中updatetime用于记录数据更新时间戳):数据库

create table jm_es_employee (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );
6. 配置Logstash做业文件

ECS中建立Logstash做业配置文件,文件名为logstash-mysql-es.conf。json

配置文件内容:curl

input{
     jdbc {
         jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name"
         jdbc_user => "db_user"
         jdbc_password => "db_password"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "select * from jm_es_employee where updatetime > :sql_last_value"
         use_column_value => true
         tracking_column => "updatetime"
         last_run_metadata_path => "./logstash_jdbc_last_run"
       } 
} 
output{
      elasticsearch {
         hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
         user => "elastic"
         password => "es_password"
         index => "employee"
         document_id => "%{id}"
      }
      stdout {
         codec => json_lines
     }
 }

其中红色字体部分要作相应的替换,input中的 schedule参数用于配置数据刷新频率,schedule => " *"表示每分钟刷新一次,这也是MySQL数据同步的最小频率。Logstash支持丰富的参数配置,详情请参考Elasitc官网文档elasticsearch

7. 同步数据

ECS中指定参数启动Logstash服务,执行命令:ide

logstash -f logstash-mysql-es.conf

_

以后每分钟会去MySQL中刷新数据测试

_

RDS中写入几条测试数据,脚本以下:

INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');

因为以前在Logstash配置文件中,output部分既配置了输出到ES,同时也输出到控制台。因此当检测到MySQL中有更新时,数据会输出到控制台中,以下图:

_

此时说明MySQL中的数据更新已经被Logstash推送到ES服务。经过在ECS执行命令检查ES服务中的索引是否被建立。执行命令:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'

_

红框内的employee即咱们在配置文件中指定的索引名,说明ES中的索引已经被成功建立。

8. 结果验证

经过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令经过关键字“Smith “来检索数据:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty'

_

至此,MySQL中的数据已经被成功索引到Elasticsearch,并也能够被准实时的检索到。

相关文章
相关标签/搜索