做者: Alex Marquardt | 地址 how-to-keep-elastichsearch-synchronzied-with-a-relational-database-using-logstashjava
近期的主要工做是在为公司的 APP 增长搜索功能。由于也遇到了须要把关系型数据库中的数据同步 ElasticSearch 中的问题,故抽了点时间翻译了这篇官方的博文。最近,在数据同步方面也有些思考。mysql
本篇文章的重点不在 Logstash 的 JDBC 插件的使用方法,而是数据同步会遇到的一些细节问题如何处理。我以为,这些设计思想是通用的,不管你使用的何种方式进行数据同步。sql
翻译正文数据库
为了利用 ElasticSearch 强大的搜索能力,大部分的业务都会在关系型数据库的基础上部署 ElasticSearch。这类场景下,保持 ElasticSearch 和关系型数据库之间的数据同步是很是必要的。json
本篇博文将会介绍如何经过 Logstash 实如今 MySQL 和 ElasticSearch 之间数据的高效复制与同步。ruby
注:文中演示的代码和方法都通过在 MySQL 中的测试,理论上适应于全部的关系型数据库。bash
本文中,组件的相关信息以下:elasticsearch
本文将会经过 Logstash 的 JDBC input 插件进行 ElasticSearch 和 MySQL 之间的数据同步。从概念上讲,JDBC 插件将经过周期性的轮询以发现上次迭代后的新增和更新的数据。为了正常工做,几个条件须要知足:测试
ElasticSearch 中 _id 设置必须来自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之间文档数据的映射关系。若是一条记录在 MySQL 更新,那么,ElasticSearch 全部关联文档都应该被重写。要说明的是,重写 ElasticSearch 中的文档和更新操做的效率相同。在内部实现上,一个更新操做由删除一个旧文档和建立一个新文档两部分组成。ui
当 MySQL 中插入或更新一条记录时,必须包含一个字段用于保存字段的插入或更新时间。如此一来, Logstash 就能够实现每次请求只获取上次轮询后更新或插入的记录。Logstash 每次轮询都会保存从 MySQL 中读取到的最新的插入或更新时间,该时间大于上次轮询最新时间。
若是知足了上述条件,咱们就能够配置 Logstash 周期性的从 MySQL 中读取全部最新更新或插入的记录,而后写入到 Elasticsearch 中。
关于 Logstash 的配置代码,本文稍后会给出。
MySQL 库和表的配置以下:
CREATE DATABASE es_db
USE es_db
DROP TABLE IF EXISTS es_table
CREATE TABLE es_table (
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
client_name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
复制代码
配置中有几点须要说明,以下:
前面设置完成,咱们能够经过以下命令插入记录:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);
复制代码
使用以下命令更新记录:
UPDATE es_table SET client_name = <new client name> WHERE id=<id>;
复制代码
使用以下命令更新插入记录:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>
复制代码
Logstash 的 pipeline 配置代码以下,它实现了前面描述的功能,从 MySQL 到 ElasticSearch 的数据同步。
input {
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => "<my username>"
jdbc_password => "<my password>"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *",
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug" }
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[%metedata][_id]}"
}
}
复制代码
关于 Pipeline 配置的几点说明,以下:
此处配置为 "unix_ts_in_secs"。它被用于追踪最新的记录,并被保存在 .logstash_jdbc_last_run 文件中,下一次轮询将以这个边界位置为准进行记录获取。SELECT 语句中,可经过 :sql_last_value 访问该配置字段的值。
由 SELECT 语句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面讨论的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其余时间形式,能够减小复杂性,防止时区致使的时间不一致问题。
内建的配置参数,指定每次轮询的开始位置。在 input 配置中,可被 SELECT 语句引用。在每次轮询开始前,从 .logstash_jdbc_last_run 中读取,此案例中,即为 "unix_ts_in_secs" 的最近值。如此即可保证每次轮询只获取最新插入和更新的记录。
经过 cron 语法指定轮询的执行周期,例子中,"*/5 * * * * *" 表示每 5 秒轮询一次。
SELECT 语句查询条件的一部分,当前解释不清,具体状况待下面的章节再做介绍。
该配置指定将 MySQL 中的 id 复制到 metadata 字段 _id 中,用以确保 ElasticSearch 中的文档写入正确的 _id。而之因此使用 metadata,由于它是临时的,不会使文档中产生新的字段。同时,咱们也会把不但愿写入 Elasticsearch 的字段 id 和 @version 移除。
在 output 输出段的配置,咱们指定了文档应该被输出到 ElasticSearch,而且设置输出文档 _id 为 filter 段建立的 metadata 的 _id。若是须要调试,注释部分的 rubydebug 能够实现。
接下来,咱们将开始解释为何 SELECT 语句中包含 modification_time < NOW() 是很是重要的。为了解释这个问题,咱们将引入两个反例演示说明,为何下面介绍的两种最直观的方法是错误的。还有,为何 modification_time < Now() 能够克服这些问题。
当 where 子句中仅仅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而没有 modification < Now() 的状况下,工做是否正常。这个场景下,SELECT 语句是以下形式:
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value) ORDER BY modification_time ASC"
复制代码
粗略一看,彷佛没发现什么问题,应该能够正常工做。但其实,这里有一些边界状况,可能致使一些文档的丢失。举个例子,假设 MySQL 每秒插入两个文档,Logstash 每 5 秒执行一次。以下图所示,时间范围 T0 至 T10,数据记录 R1 至 R22。
Logstash 的第一次轮询发生在 T5 时刻,读取记录 R1 至 R11,即图中青色区域。此时,sql_last_value 即为 T5,这个时间是从 R11 中获取到的。
若是,当 Logstash 完成从 MySQL 读取数据后,一样在 T5 时刻,又有一条记录插入到 MySQL 中。 而下一次的轮询只会拉取到大于 T5 的记录,这意味着 R12 将会丢失。如图所示,青色和灰色区域分别表示当次和上次轮询获取到的记录。
注意,这类场景下的 R12 将永远不会再被写入到 ElasticSearch。
为了解决这个问题,或许有人会想,若是把 where 子句中的大于(>)改成大于等于(>=)是否可行。SELECT 语句以下:
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"
复制代码
这种方式其实也不理想。这种状况下,某些文档可能会被两次读取,重复写入到 ElasticSearch 中。虽然这不影响结果的正确性,但却作了多余的工做。以下图所示,Logstash 的首次轮询和场景一相同,青色区域表示已经读取的记录。
Logstash 的第二次轮询将会读取全部大于等于 T5 的记录。以下图所示,注意 R11,即紫色区域,将会被再次发送到 ElasticSearch 中。
这两种场景的实现效果都不理想。场景一会致使数据丢失,这是没法容忍的。场景二,存在重复读取写入的问题,虽然对数据正确性没有影响,但执行了多余的 IO。
前面的两场方案都不可行,咱们须要继续寻找其余解决方案。其实也很简单,经过指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),咱们就能够保证每条记录有且只发送一次。
以下图所示,Logstash 轮询发生在 T5 时刻。由于指定了 modification_time < NOW(),文档只会读取到 T4 时刻,而且 sql_last_value 的值也将会被设置为 T4。
开始下一次的轮询,当前时间 T10。
因为设置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而且当前 sql_last_value 为 T4,所以,本次的轮询将从 T5 开始。而 modification_time < NOW() 决定了只有时间小于等于 T9 的记录才会被读取。最后,sql_last_value 也将被设置为 T9。
如此,MySQL 中的每一个记录就能够作到都能被精确读取了一次,如此就能够避免每次轮询可能致使的当前时间间隔内数据丢失或重复读取的问题。
简单的测试能够帮助咱们验证配置是否如咱们所愿。咱们能够写入一些数据至数据库,以下:
INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');
复制代码
一旦 JDBC 输入插件触发执行,将会从 MySQL 中读取全部记录,并写入到 ElasticSearch 中。咱们能够经过查询语句查看 ElasticSearch 中的文档。
GET rdbms_sync_idx/_search
复制代码
执行结果以下:
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:04:27.436Z",
"modification_time" : "2019-06-18T12:58:56.000Z",
"client_name" : "Jim Carrey"
}
},
Etc …
复制代码
更新 id=1 的文档,以下:
UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;
复制代码
经过 _id = 1,能够实现文档的正确更新。经过执行以下命令查看文档:
GET rdbms_sync_idx/_doc/1
复制代码
响应结果以下:
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 3,
"_primary_term" : 1,
"found" : true,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:09:30.300Z",
"modification_time" : "2019-06-18T13:09:28.000Z",
"client_name" : "Jimbo Kerry"
}
}
复制代码
文档 _version 被设置为 2,而且 modification_time 和 insertion_time 已经不同了,client_name 已经正确更新。而 @timestamp,不是咱们须要关注的,它是 Logstash 默认添加的。
更新添加 upsert 执行语句以下:
INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';
复制代码
和以前同样,咱们能够经过查看 ElasticSearch 中相应文档,即可验证同步的正确性。
不知道你是否已经发现,若是一个文档从 MySQL 中删除,并不会同步到 ElasticSearch 。关于这个问题,列举一些可供咱们考虑的方案,以下:
MySQL 中的记录可经过包含 is_deleted 字段用以代表该条记录是否有效。一旦发生更新,is_deleted 也会同步更新到 ElasticSearch 中。若是经过这种方式,在执行 MySQL 或 ElasticSearch 查询时,咱们须要重写查询语句来过滤掉 is_deleted 为 true 的记录。同时,须要一些后台进程将 MySQL 和 ElasticSearch 中的这些文档删除。
另外一个可选方案,应用系统负责 MySQL 和 ElasticSearch 中数据的删除,即应用系统在删除 MySQL 中数据的同时,也要负责将 ElasticSearch 中相应的文档删除。
本文介绍了如何经过 Logstash 进行关系型数据库和 ElasticSearch 之间的数据同步。文中以 MySQL 为例,但理论上,演示的方法和代码也应该一样适应于其余的关系型数据库。