logstash-input-jdbc同步mysql数据到elasticsearch

前言:

有项目之前使用coreseek来作索引,随着时间推移,技术上面也要更新换代,该项目后端从php5升级到php7,
而这时候问题来了,coreseek的做者已经再也不更新,官网也关闭了,所以寻求其余的索引软件替代,而elasticsearch恰好能够很好的知足业务php

1、安装

1.首先须要安装java,elasticsearch,logstash,能够参考我另一篇文章:https://segmentfault.com/a/11...java

2.而后logstash-input-jdbc安装node

cd /opt/logstash
.bin/plugin install logstash-input-jdbc

2、配置

安装仍是比较容易的,主要是配置这里有一些坑,这里须要为logstash准备配置文件jdbc.conf和jdbc.sqlmysql

jdbc.conf文件内容sql

input {
    stdin {
    }
    jdbc {
      // mysql相关jdbc配置
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/your_mysql_database"
      jdbc_user => "mysql_user"
      jdbc_password => "mysql_password"
 
      // jdbc链接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
      jdbc_driver_library => "/opt/logstash/conf/mysql-connector-java/mysql-connector-java-5.1.44-bin.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      // mysql文件, 也能够直接写SQL语句在此处,以下:
      // statement => "SELECT * from Table_test;"
      statement_filepath => "/opt/logstash/conf/jdbc.sql"

      // 这里相似crontab,能够定制定时操做,好比每10分钟执行一次同步(分 时 天 月 年)
      schedule => "*/10 * * * *"
      type => "jdbc"

      // 是否记录上次执行结果, 若是为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
      record_last_run => "true"

      // 是否须要记录某个column 的值,若是record_last_run为真,能够自定义咱们须要 track 的 column 名称,此时该参数就要为 true. 不然默认 track 的是 timestamp 的值.
      use_column_value => "true"

      // 若是 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 通常是mysql主键
      tracking_column => "autoid"

      last_run_metadata_path => "/opt/logstash/conf/last_id"

      // 是否清除 last_run_metadata_path 的记录,若是为真那么每次都至关于从头开始查询全部的数据库记录
      clean_run => "false"

      //是否将 字段(column) 名称转小写
      lowercase_column_names => "false"
    }
}

// 此处我不作过滤处理,若是须要,也可参考elk安装那篇
filter {}

output {
    // 输出到elasticsearch的配置
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "jdbc"

        // 将"_id"的值设为mysql的autoid字段
        document_id => "%{autoid}"
        template_overwrite => true
    }

    // 这里输出调试,正式运行时能够注释掉
    stdout {
        codec => json_lines
    }
}

下面是mysql文件jdbc.sql,注意:sql_last_value,后面会说道数据库

SELECT
*

FROM
Table_test

WHERE
autoid > :sql_last_value

3、遇到的问题

相信不少人在安装和实际使用的过程当中有这样那样的问题,这里我记录了一些本身遇到的json

1.elasticsearch数据重复以及增量同步segmentfault

在默认配置下,tracking_column这个值是@timestamp,存在elasticsearch就是_id值,是logstash存入elasticsearch的时间,这个值的主要做用相似mysql的主键,是惟一的,可是咱们的时间戳实际上是一直在变的,因此咱们每次使用select语句查询的数据都会存入elasticsearch中,致使数据重复。
解决方法:在要查询的表中,找主键或者自增值的字段,将它设置为_id的值,由于_id值是惟一的,因此,当有重复的_id的时候,数据就不会重复后端

// 是否记录上次执行结果, 若是为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  record_last_run => "true"

  // 是否须要记录某个column 的值,若是record_last_run为真,能够自定义咱们须要 track 的 column 名称,此时该参数就要为 true. 不然默认 track 的是 timestamp 的值.
  use_column_value => "true"

  // 若是 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 通常是mysql主键
  tracking_column => "autoid"

2.数据同步频繁,影响mysql数据库性能安全

咱们写入jdbc.sql文件的mysql语句是写死的,因此每次查询的数据库有不少是已经不须要去查询的,尤为是每次select * from table;的时候,对mysql数据库形成了很是大的压力

解决:(1)根据业务需求,能够适当修改定时同步时间,我这里对实时性相对要求较高,所以设置了10分钟

// 这里相似crontab,能够定制定时操做,好比每10分钟执行一次同步(分 时 天 月 年)
  schedule => "*/10 * * * *"

(2)设置mysql查询范围,防止大量的查询拖死数据库

// 若是 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 通常是mysql主键
  tracking_column => "autoid"

  // 上次执行数据库的值,该值是上次查询时tracking_column设置的字段最大值
  last_run_metadata_path => "/opt/logstash/conf/last_id"

  // 是否清除 last_run_metadata_path 的记录,若是为真那么每次都至关于从头开始查询全部的数据库记录
  clean_run => "false"

在sql语句这里设置select * from WHERE autoid > :sql_last_value;
注意:若是你的语句比较复杂,autoid > :sql_last_value必定要写在WHERE后面,而后接AND便可

3.elasticsearch存储容量不断上升

稍微观察下就会发现,即便没有新的数据写入到elasticsearch里面,但只要logstash定时每次运行,elasticsearch容量就不断上升

clipboard.png

过一段时间看,占用空间增大,其实elasticsearch数据是同样的

clipboard.png

缘由:在elasticsearch/nodes/0/indices/jdbc/{0,1,2,3,4}/下有个translog,这个是elasticsearch的事务日志,相似mysql的binlog。elasticsearch为了数据安全,接收到数据后,先将数据写入内存和translog,而后再创建索引写入到磁盘,这样即便忽然断电,重启后,还能够经过translog恢复,不过这里因为咱们每次查询都有不少重复的数据,而这些重复的数据又没有写入到elasticsearch的索引中,因此就囤积了下来

clipboard.png

解决:查询官网说会按期refresh,会自动清理掉老的日志,所以可不作处理

4.增量同步和mysql范围查询致使mysql数据库有修改时没法同步到之前的数据

增量同步解决了,mysql每次都小范围查询,解决了数据库压力的问题,不过却致使没法同步老数据的修改问题

解决:可根据业务状态来作,若是你数据库是修改频繁类型,那只能作全量更新了,可是高频率大范围扫描数据库来作的索引还不如不作索引了(由于创建索引也是有成本的),咱们作索引主要是针对一些数据量大,不常修改,很消耗数据库性能的状况。我这里是数据修改较少,并且修改也通常是近期数据,由于同步时,我在mysql范围上面稍微调整一下

如:autoid > (:sql_last_value-100000),每次扫描上次扫描范围往以前再多10W行,这样扫描的数据量相对较小,也照顾到了可能会修改的数据类型

可是范围扫描还存在一个问题,就是过往的数据写入了elasticsearch以后,若是有修改,而又不在范围扫描之内,那么elasticsearch就不会同步到。所以,咱们还能够按期作一次全量或者更大范围的同步,只须要修改范围值便可。具体的值固然能够根据业务来定

相关文章
相关标签/搜索