在许多状况下,咱们但愿使用不是由Elasticsearch自己支持的不一样数据库的输入。在本文中,咱们将展现如何经过Logstash将数据从MySql数据库迁移到Elasticsearch。mysql
Logstash可用的JDBC插件确保来自任何具备JDBC接口的数据库的数据均可以做为输入存入Logstash。这个插件还支持须要调度运行logstash。它还经过使用查询使选择性数据做为输入。在这些类型的数据库中,咱们有行和列的概念。每一个行都被视为单个事件,每一个行(事件)中的列都被视为事件中的字段。git
如下框图说明了JDBC链接器插件在从JDBC支持的数据库迁移数据时的做用:
github
在图中,咱们有logstash运行配置文件,触发咱们设置的预约义查询,以便将咱们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。 sql
根据咱们的要求,咱们能够处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。咱们将在后面的章节中的示例中显示详细的应用程序。数据库
让咱们继续在Logstash的帮助下将数据从顺序数据库(如MySql)迁移到Elasticsearch。咱们须要相应的MySql的JDBC驱动程序。您能够 在这里下载。 如今让咱们"testdb"
使用如下命令建立一个在MySql 中命名的数据库:curl
建立 testdb
数据库如今已建立,咱们只是确保咱们使用相同的数据库用于咱们的目的:elasticsearch
显示 数据库 ; 使用 testdb;
使用如下模式建立"testtable"
在数据库下命名的表"testdb"
:测试
建立 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));
如今在上表中插入一些测试数据: url
INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810'); INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460'); INSERT INTO testtable(PersonID,LastName,FirstName,City,
咱们建立了一个包含3名员工详细信息的表。您能够经过传递查询来显示表的详细信息以显示其全部内容:spa
select * from testtable
结果表将以下所示:
如今咱们已经建立了一个内容如上所示的MySql表,看看如何配置Logstash。在logstash文件夹中,咱们有一个logstash.conf
文件,它是要配置和运行以获取必要的结果的文件。初始配置如如下屏幕截图所示:
在上面的配置文件中,咱们提到了许多参数,例如:JDBC链接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于"logstash.conf"
文件后,经过键入如下命令运行Logstash:
bin / logstash -f logstash .conf
如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引"test-migrate"
以下:
curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true' -d '{}'
上述查询将每行做为单独的文档列出,列为字段。一个例子:
{ “_index” : “测试迁移”, “_type” : “数据”, “_id” :“4004” , “_score” :1, “_source”:{ “PERSONID” :4004, “姓氏”:“男爵“, ”名字“:”理查“, ”城市“:”开普敦“, ”日期“:”2016-05-23T10:42:04.370Z“ , “@version”:“1”, “@timestamp”:“2016-07-10T10:36:43.685Z” }} }}
在本节中,咱们将展现各类用例场景。向上面的MySql中添加另外一行数据,以下所示:
INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');
另外,更新同一表格中一行的现有值,以下所示:
UPDATE测试表 - > SET FirstName = 'James' - > WHERE PersonID = 4005 ;
完成上述步骤后,再次运行logstash配置文件。咱们指望总共4个文档包括新的行和更新的行。可是,当再次检查索引时,不是这样的。相反,咱们共有7个文件。这是由于初始文档在elasticsearch数据库中保持不变,这是因为没有从表自己提供特定的id。当咱们运行logstash配置时,整个内容"testtable"
都会被索引一次。
咱们如何解决这种重复?咱们必须为每一个文档提供一个惟一的ID。对于每次运行,每一个文档的ID应该相同,以防止重复问题。这能够经过编辑conf文件的输出部分来实现,以下所示:
咱们遇到的其余重要需求之一将是更改字段名称和值,当咱们索引到elasticsearch。咱们在咱们当前的例子中添加一个要求来演示这个。对于全部与“开普敦”匹配的文档,该字段"City"
应替换为值"South Africa"
,字段值应替换为"Country"
。
为了实现此要求,请使用该"filter"
属性在elasticsearch中操做已提取的数据。使用"mutate"
里面的属性"filter"
执行所需的更改。使用上面的"input"
和"output"
部分的设置,咱们须要"filter"
在logstash.conf
文件中添加如下部分:
filter { if [city] == “开普敦” { mutate { rename => { “city” => “country” } replace => [ “country”,“South Africa” ] }} }} }}
检查所获取的数据"Cape Town"
的每一个事件的"City"
列的值。若是找到匹配项,"City"
则重命名该字段,"country"
并将每一个匹配的值替换为"South Africa"
。
若是数据在MySql数据库中不断更新,咱们须要递增和按期对其进行索引,该怎么办?要按期获取数据,请"scheduler"
在输入部分中添加属性。在"scheduler,"
给定的值时,让每隔一段时间运行conf文件。它是高度可定制的,并使用Rufus调度程序语法。
对于增量更新,请修改查询以"sql_last_value"
针对字段使用。这里咱们给这个字段 "Date"
。咱们还设置"use_column_value"
为true,并将对应的列连接到"Date"
使用"tracking_column"
。
用于状况1,2和3的完整配置文件以下:
为了看到上面的配置工做,添加几个字段到现有的MySql表,其"Date"
值比以前存在的值更新。如今运行logstash,您能够看到只有新数据已在Elasticsearch索引中创建索引。
在本文中,咱们讨论了用于使用logstash将数据从连续数据库迁移到Elasticsearch的JDBC插件。咱们还熟悉如何处理常见问题,如重复,字段和值的突变,以及调度和增量更新。问题/意见?给咱们一行下面。