1、实验介绍
logstash-jdbc-input
是Logstash提供的官方插件之一,该插件经过JDBC接口将任何数据库中的数据导入 Logstash。关于使用logstash-jdbc-input
插件从数据库中导出数据到es上,大部分是关于mysql数据库的导入。本篇文章是关于如何使用logstash-jdbc-input
插件对mongodb的数据进行实时导入。html
2、版本说明
本实验使用的ELK版本是7.6.2。
(这里想要补充一下,关于mongodb数据库的数据导入,另一种常使用的插件是mongo-connector,但该插件仅支持到elasticsearch5.x,所以对于更高版本的elasticsearch更推荐使用本篇文章使用的方法。)
java
3、具体实现
1. 下载相关的jdbc-driver文件并解压
- 下载地址: https://dbschema.com/jdbc-drivers/MongoDbJdbcDriver.zip
- 解压安装包:
unzip MongoDbJdbcDriver.zip
(安装包里面包括三个jar
包文件:gson-2.8.6.jar
、mongo-java-driver-3.12.4.jar
、mongojdbc2.1.jar
) - 将全部文件(即三个jar包)复制到
(~/logstash-7.6.2/logstash-core/lib/jars/)
目录(即你的logstash所在的安装目录)
2. 编写配置文件内容
- 在你的logstash安装目录下新建一个
.conf
文件 - 关于
.conf
配置文件主要由input
,filter
,output
三大板块组成,咱们依次介绍如何填写各部分的内容:
2.1 input
input { jdbc { jdbc_driver_class => "com.dbschema.MongoJdbcDriver" # jar包的目录 jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar" # mongo数据库对应的uri jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest" # 这个不填 jdbc_user => "" # 这个不填 jdbc_password => "" # 表示每分钟中执行一次,以实现实时同步的效果 schedule => "* * * * *" # mongodb的查询语句 statement => "db.dbtest.find({}, {_id: 0})" } }
- 在编写mongodb查询语句时咱们须要注意,因为logstash没法识别mongodb中的
ObjectId
类型,所以咱们须要抛弃该字段,所以在find
语句中咱们设置_id:0
,即表示不须要该字段。
2.2 filter
filter { # 数据预处理 }
- filter部分主要是针对mongodb中的数据进行预处理,若是不须要进行预处理,这部份内容没必要填写;关于filter实现预处理的部份内容比较繁多,以后会专门出一篇文章进行总结,这里再也不赘述。
2.3 output
output { elasticsearch { # es所在的地址 hosts => "localhost:9200" # 导入到es上对应的索引 index => "test" } stdout { codec => json_lines } }
3. 实现数据的实时同步(全量法)
- 全量法,即指每次将表的全部数据所有导入,这种方法可能会致使数据重复的问题,由于每次同步时都会将以前已经导入的数据再导入一遍,为避免数据重复的问题,咱们须要对每条数据进行标识,这样在每次同步时es中若已出现相同标识的数据则会选择覆盖,以此实现数据实时同步的效果。
- 实现数据标识效果,即在
output
部分指定document_id
便可
output { elasticsearch { # es所在的地址 hosts => "localhost:9200" # 导入到es上对应的索引 index => "test" # 指定标识每条数据的字段 document_id => "%{id}" } stdout { codec => json_lines } }
- 这里须要注意的是,咱们没法使用mongodb自动生成的id做为标识符,由于id是
ObjectId
类型,在input
阶段咱们已经把该字段删去了,所以这里应该选择表中其余能标识数据且不是ObjectId
类型的字段(string, int等皆可)
4. 实现数据的实时同步(增量法)
- 若在你的数据中除了mongodb自动生成的
id
再也不有其它具备标识性质的字段,能够考虑使用增量法实现数据的实时同步。增量法,即每次同步时是从上一次执行命令的时间开始,将插入时间在上一次命令以后的数据导入es中。增量法的优势是没必要每次将所有数据导入,而是只导入新加入到数据库的数据,能够减少每次同步时的压力。 - 使用增量法实现数据同步,须要修改
input
部分的代码
input { jdbc { jdbc_driver_class => "com.dbschema.MongoJdbcDriver" # jar包的目录 jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar" # mongo数据库对应的uri jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest" # 这个不填 jdbc_user => "" # 这个不填 jdbc_password => "" # 表示每分钟中执行一次,以实现实时同步的效果 schedule => "* * * * *" # 实现增量同步的mongodb的查询语句 statement => "db.dbtest.find({ $gte: ISODate(:sql_last_value) }, {_id: 0})" # 保存上一次执行时间的文件 last_run_metadata_path => "/logstash-7.6.2/.logstash_jdbc_last_run" } }
- 实现增量同步主要是两个字段:
statement:
执行mongodb查询的字段- 关于
:sql_last_value
:logstash中提供的一个协助查询的时间参数,默认值是1970-01-01 08:00:00
,数据类型是string
,每次执行命令以后,该值会替换成执行命令时刻的时间。 - 在修改
find
语句时容易由于:sql_last_value
的类型出错:若是表中关于时间的数据类型是string
,那在find
语句中改成db.dbtest.find({ $gte: :sql_last_value}, {_id: 0})
便可;若若是表中关于时间的数据类型是date
,那在find
语句须要进行类型转换,即改成·db.dbtest.find({ $gte: ISODate(:sql_last_value)}, {_id: 0})
- 关于
last_run_metadata_path:
保存上一次执行时间的文件,能够放在任意目录下,我这里放在了/logstash-7.6.2
的目录下面
5. 运行文件
/logstash-7.6.2/bin/logstash -f /logstash-7.6.2/dbtest.conf --path.data=/logstash-7.6.2/data/dbtest
4、可能出现的报错
1. 没法识别ObjectId错误
-
报错信息:
Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.bson.types.ObjectId, simple name=ObjectId>}
mysql -
错误缘由:在
input
部分编写mongodb查询语句时须要注意,因为logstash没法识别mongodb中的ObjectId
类型,所以咱们须要抛弃该字段,所以在find
语句中咱们设置_id:0
,即表示不须要该字段。git
db.dbtest.find({}, {_id: 0})
【tips】经过mongodb的find
语句咱们还能够选取只导出文档中的某一字段,具体操做可参考官方文档:https://docs.mongodb.com/manual/reference/method/db.collection.find/github
2. 增量同步无效可是没有报错信息
- 这一问题的缘由在上面的增量法部分中也有提到过,多是
find
语句中:sql_last_value
或者其余字段的数据类型不正确,建议检查一下数据库中字段类型和find
语句中的查询条件是否匹配
参考文章
- https://stackoverflow.com/questions/58342818/sync-mongodb-to-elasticsearch
- https://docs.mongodb.com/manual/reference/method/db.collection.find/
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html