在咱们使用mysql和elasticsearch结合使用的时候,可能会有一些同步的需求,想要数据库和elasticsearch同步的方式其实有不少。java
可使用canal,它主要是监听mysql的binlog 日志,能够监听数据的一些变化,若是数据发生了变化咱们须要作什么逻辑,这些都是能够人为实现的,它是将本身模拟成一个slave节点,当master节点的数据发生变化是,它可以看到数据的变化。可是缺点也很明显,因为是java实现的,因此比较重,还须要使用zookeeper等集群管理工具来管理canal节点,因此本文暂时不介绍这种方式。mysql
本文主要介绍使用Logstash JDBC的方式来实现同步,这个方式同步比较简单。固然它有一些缺点,就是有点耗内存(内存大就当我没说😂)。git
GET /myapp/_search
{
"_source": "id"
}
复制代码
响应结果github
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
复制代码
mysql> select * from user;
+------+----------+-------------+------------+-------+------------+---------+
| id | name | phone | password | title | content | article |
+------+----------+-------------+------------+-------+------------+---------+
| 1 | zhnagsan | 181222 | 123123 | ??? | ?????? | ???IE |
| 2 | lishi | 181222113 | 232123123 | 23??? | 234?????? | 4???IE |
| 3 | wangwu | 18111214204 | 1337547531 | ????? | lc content | Java |
+------+----------+-------------+------------+-------+------------+---------+
3 rows in set (0.00 sec)
mysql>
复制代码
如今咱们执行一个sql向里面添加一条数据sql
mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "测试", "测试内容", "Java")
Query OK, 1 row affected (0.00 sec)
mysql>
复制代码
GET /myapp/_search
{
"_source": "id"
}
复制代码
响应结果数据库
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "4",
"_score": 1,
"_source": {
"id": 4
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
复制代码
Virtual machine:VMware 11.0.2 Operator System:CentOS release 6.9 (Final) ElasticSearch:6.4.0 Kibana版本:6.4.0 LogStash版本:6.6.1 JDK版本:1.8.0_181 MySQL版本: 5.1.73(这个版本是用yum直接安装的,其实这个教程和mysql版本没有多大关系,由于到时候是使用jdbc的驱动包来链接数据库的) logstash jdbc驱动包版本 5.1.46vim
本文暂时只提供下载地址(暂时偷个大懒 😄,安装顺序是按照连接排列顺序),logstash会给予详细使用说明,它是不须要安装的是须要解压就好了。bash
Virtual machine:VMware 11.0.2服务器
Operator System:CentOS release 6.9 (Final)微信
若是环境都安装好了能够看下面的了,没安装好也能够看😁
Logstash是一个开源数据收集引擎,具备实时管道功能。Logstash能够动态地未来自不一样数据源的数据统一块儿来,并将数据标准化到你所选择的目的地。
Logstash是一个开源的服务器端数据处理管道,能够同时从多个数据源获取数据,并对其进行转换,而后将其发送到你最喜欢的“存储”。(固然,咱们最喜欢的是Elasticsearch)
数据每每以各类各样的形式,或分散或集中地存在于不少系统中。Logstash 支持各类输入选择 ,能够在同一时间从众多经常使用来源捕捉事件。可以以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各类 AWS 服务采集数据。
数据从源传输到存储库的过程当中,Logstash 过滤器可以解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。 Logstash 可以动态地转换和解析数据,不受格式或复杂度的影响:
尽管 Elasticsearch 是咱们的首选输出方向,可以为咱们的搜索和分析带来无限可能,但它并不是惟一选择。Logstash 提供众多输出选择,您能够将数据发送到您要指定的地方,而且可以灵活地解锁众多下游用例。
接下来,从命令行输入以下命令
bin/logstash -e 'input { stdin {} } output { stdout {} }'
选项 -e 的意思是容许你从命令行指定配置
复制代码
当启动完成时,会等待你的输入,你能够输入hello world
试试,它会给你一下信息的回馈。
首先,将咱们刚才给予的下载连接里面的jdbc驱动包放到logstash目录里面来
解压这个文件
[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip
复制代码
input {
jdbc {
# jdbc驱动包位置
jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# 要使用的驱动包类,有过java开发经验的应该很熟悉这个了,不一样的数据库调用的类不同。
jdbc_driver_class => "com.mysql.jdbc.Driver"
# myqsl数据库的链接信息
jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp"
# mysql用户
jdbc_user => "root"
# mysql密码
jdbc_password => "root"
# 定时任务, 多久执行一次查询, 默认一分钟,若是想要没有延迟,可使用 schedule => "* * * * * *"
schedule => "* * * * *"
# 你要执行的语句
statement => "select * from user"
}
}
output {
# 将数据输出到ElasticSearch中
elasticsearch {
# es ip加端口
hosts => ["0.0.0.0:9200"]
# es文档索引
index => "myusreinfo"
# es文档数据的id,%{id}表明的是用数据库里面记录的id做为文档的id
document_id => "%{id}"
}
}
复制代码
上面咱们已经生成了这个mysqlsyn.conf这个文件,接下来咱们就使用logstash来进行数据同步吧,同步以前先看下咱们的数据库的user表的数据。
查看mysql数据, 可以看到咱们仍是只有刚开始的4条数据
检查ElasticSearch是否有myusreinfo
这个索引,能够从图中看到咱们只有myapp
这个索引。
带上配置文件启动logstash
[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf
-f 指定配置文件启动
复制代码
启动成功,而且已经在同步数据了,这个同步默认是每分钟执行一次,能够看到5分钟执行了5次
上面已经启动了同步,如今咱们去看看ElasticSearch里面的是否有数据,从图中能够看到myusrinfo已经同步到es里面了,而且能够看到docs.count
的数量就是咱们刚才数据库里面数据的数量。
咱们像数据库里面增长一条数据,而后看下ElasticSearch的数据是否会改变
查看ElasticSearch里面是否有刚才添加的数据, 从图中能够看到已经有5条数据了
先看ElasticSearch里面id为5的数据,能够看到name为yinya
数据库修改一条id为5的数据,看看ElasticSearch的数据变化
查看ElasticSearch里面数据是否已经更改,能够看到数据已经更改了
删除两条数据看看ElasticSearch数据的变化, 删除了id为1和2的两条数据
查看ElasticSearch里面数据是否已经更改, 然而数据并无变
到目前为止,全部google,stackoverflow,elastic.co,github上面搜索的插件和实时同步的信息,告诉咱们:目前同步delete尚未好的解决方案。 折中的解决方案以下: 方案探讨:discuss.elastic.co/t/delete-el…
stackoverflow.com/questions/3…
方案一 在原有的mysql数据库表中,新增一个字段status, 默认值为ok,若是要删除数据,实则用update操做,status改成deleted, 这样,就能同步到es中。es中以status状态值区分该行数据是否存在。deleted表明已删除,ok表明正常。
方案二 使用go elasticsearch 插件实现同步。
欢迎你们加我微信coding4life,或扫描二维码,一块儿讨论技术~