使用Logstash将MySql数据迁移到Elasticsearch中

在许多状况下,咱们但愿使用不是由Elasticsearch自己支持的不一样数据库的输入。在本文中,咱们将展现如何经过Logstash将数据从MySql数据库迁移到Elasticsearch。mysql

JBDC插件

Logstash可用的JDBC插件确保来自任何具备JDBC接口的数据库的数据均可以做为输入存入Logstash。这个插件还支持须要调度运行logstash。它还经过使用查询使选择性数据做为输入。在这些类型的数据库中,咱们有行和列的概念。每一个行都被视为单个事件,每一个行(事件)中的列都被视为事件中的字段。git

如下框图说明了JDBC链接器插件在从JDBC支持的数据库迁移数据时的做用: 
mysql-logstash1.png#asset:1073github

在图中,咱们有logstash运行配置文件,触发咱们设置的预约义查询,以便将咱们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。 sql

根据咱们的要求,咱们能够处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。咱们将在后面的章节中的示例中显示详细的应用程序。数据库

将MySql数据插入Elasticsearch

让咱们继续在Logstash的帮助下将数据从顺序数据库(如MySql)迁移到Elasticsearch。咱们须要相应的MySql的JDBC驱动程序。您能够 在这里下载 如今让咱们"testdb"使用如下命令建立一个在MySql 中命名的数据库:curl

建立 testdb

数据库如今已建立,咱们只是确保咱们使用相同的数据库用于咱们的目的:elasticsearch

显示 数据库 ;
使用 testdb;

使用如下模式建立"testtable"在数据库下命名的表"testdb"测试

建立 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));

如今在上表中插入一些测试数据: url

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,

咱们建立了一个包含3名员工详细信息的表。您能够经过传递查询来显示表的详细信息以显示其全部内容:spa

select * from testtable

结果表将以下所示: 

mysqllogstash2.png#asset:1074

Logstash配置

如今咱们已经建立了一个内容如上所示的MySql表,看看如何配置Logstash。在logstash文件夹中,咱们有一个logstash.conf文件,它是要配置和运行以获取必要的结果的文件。初始配置如如下屏幕截图所示: 

mysqllogstash3.png#asset:1075

在上面的配置文件中,咱们提到了许多参数,例如:JDBC链接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于"logstash.conf"文件后,经过键入如下命令运行Logstash:

bin / logstash -f logstash .conf

如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引"test-migrate"以下:

curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true'  -d  '{}'

上述查询将每行做为单独的文档列出,列为字段。一个例子: 

{
   “_index” : “测试迁移”,
   “_type” : “数据”,
   “_id” :“4004” ,
   “_score” :1,
   “_source”:{
     “PERSONID” :4004,
     “姓氏”:“男爵“,
     ”名字“:”理查“,
     ”城市“:”开普敦“,
     ”日期“:”2016-05-23T10:42:04.370Z“ ,
    “@version”:“1”,
     “@timestamp”:“2016-07-10T10:36:43.685Z”
  }}
}}

更多配置

在本节中,咱们将展现各类用例场景。向上面的MySql中添加另外一行数据,以下所示:

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');

另外,更新同一表格中一行的现有值,以下所示:

UPDATE测试表
- > SET FirstName = 'James' 
 - > WHERE PersonID = 4005 ;

1.重复问题

完成上述步骤后,再次运行logstash配置文件。咱们指望总共4个文档包括新的行和更新的行。可是,当再次检查索引时,不是这样的。相反,咱们共有7个文件。这是由于初始文档在elasticsearch数据库中保持不变,这是因为没有从表自己提供特定的id。当咱们运行logstash配置时,整个内容"testtable"都会被索引一次。

咱们如何解决这种重复?咱们必须为每一个文档提供一个惟一的ID。对于每次运行,每一个文档的ID应该相同,以防止重复问题。这能够经过编辑conf文件的输出部分来实现,以下所示:

mysqllogstash4.png#asset:1076

2.变换操做

咱们遇到的其余重要需求之一将是更改字段名称和值,当咱们索引到elasticsearch。咱们在咱们当前的例子中添加一个要求来演示这个。对于全部与“开普敦”匹配的文档,该字段"City"应替换为值"South Africa",字段值应替换为"Country"

为了实现此要求,请使用该"filter"属性在elasticsearch中操做已提取的数据。使用"mutate"里面的属性"filter"执行所需的更改。使用上面的"input""output"部分的设置,咱们须要"filter"logstash.conf文件中添加如下部分:

filter {
   if [city] == “开普敦” {
    mutate {
       rename => { “city” => “country” }
        replace => [ “country”,“South Africa” ]
      }}
   }}
}}

检查所获取的数据"Cape Town"的每一个事件的"City"列的值。若是找到匹配项,"City"则重命名该字段,"country"并将每一个匹配的值替换为"South Africa"。  

3.计划和增量更新

若是数据在MySql数据库中不断更新,咱们须要递增和按期对其进行索引,该怎么办?要按期获取数据,请"scheduler"在输入部分中添加属性。在"scheduler,"给定的值时,让每隔一段时间运行conf文件。它是高度可定制的,并使用Rufus调度程序语法

对于增量更新,请修改查询以"sql_last_value"针对字段使用。这里咱们给这个字段  "Date"。咱们还设置"use_column_value"为true,并将对应的列连接到"Date"使用"tracking_column"

用于状况1,2和3的完整配置文件以下:

mysqllogstash5.png#asset:1077

为了看到上面的配置工做,添加几个字段到现有的MySql表,其"Date"值比以前存在的值更新。如今运行logstash,您能够看到只有新数据已在Elasticsearch索引中创建索引。

结论

在本文中,咱们讨论了用于使用logstash将数据从连续数据库迁移到Elasticsearch的JDBC插件。咱们还熟悉如何处理常见问题,如重复,字段和值的突变,以及调度和增量更新。问题/意见?给咱们一行下面。

相关文章
相关标签/搜索