MongoDB-Elasticsearch 实时数据导入

时间  2017-09-18

 

5 ways to synchronize data from MongoDb to ElasticSearchcss

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-haopython

 

Elastic search(ES) is a pop-star for recording and analyzing data, and Mongodb is a famous NoSQL database for storing and querying data. 
With our web infrastructure improving, how can we export data from mongodb to ES for searching or analyzing purposes? 
There are 5 possible solutions recommended for your choice.

1.synchronized by web server

We can use Mongoosastic module for storing-in-both-sides purpose when we use Nodejs as a web server container. When one document needs to be stored, 
Mongoosastic can commit the changes to both mongo and ES. As the chart below: Here is the reference link: Mongoosastic. The advantage is that data can be stored in both mongo and ES simultaneously, and the downside is that overhead may be caused in CUD operation efficiency.
And inconsistent data might be generated when one type of the db store failed. And the server framework is not flexible enough for db migrating.

2.Manually loading data from Mongo to ES

Transporter tool is a good choice to synchronize data once you want to export mongo data to another ES server. Transporter also can export data from or to other type of data store. Reference link is: Transporter.

It's important to know that the transporter synchronizing only once. When the job is done, the transporter comes to its end. 3. Plugin for ES There is a plugin for ES named "elasticsearch-river-mongodb", and was widely used in ES 1.x, but now river mechanism for ES 2.x is deprecated. Reference link is elasticsearch-river-mongodb. 4. JDBC input plugin for logstash We can take advantage of buffering , inputting, outputting and filtering abilities from logstash by adding a mongo input and ES output plugin to get this job done. JDBC input plugin is one of the choices, but it needs JDBC driver support. As I know there is no well-supported-free JDBC driver for mongo. Some trial versions can be found in Unity or Simba. Reference link is : JDBC Plugin for Logstash 5. Mongo-ES connector mongo-connector is a real-time sync service as a package of python. It creates a pipeline from a mongodb cluster to one or more target systems. 
It needs mongo to run in replica-set mode, sync data in mongo to the target then tails the mongo oplog. 
It needs a package named "elastic2_doc_manager" to write data to ES. Process chart below:

Reference link is : github or python.

To recapitulate it,  it is a must to remember: mongo replica set, an opened port and IP for ES, using elastic2_doc_manager if you use ES 2.x. At present, I am not yet ready with any official support in Beats. it will be in the future. So that's all, 5 ways to mongo-ES-sync.

===============================================》》》》》》 

 

 

MongoDB-Elasticsearch 实时数据导入git

https://zhuanlan.zhihu.com/p/26906652github

 

搜索功能是App必不可少的一部分,咱们使用目前比较流行的Elasticsearch进行全文检索。咱们的数据主要存储在MongoDB中,如何将这些数据导入到Elasticsearch中,并能一直保持同步呢?作法大体分为两种:web

  1. 在应用层操做,在读写MongoDB的同时读写Elasticsearch,好比mongoosastic,须要修改已有的业务代码。
  2. 与业务无关,经过读取MongoDB的replica oplog,将MongoDB产生的操做在Elasticsearch上replay,来实现单向同步

 

为了减小老代码修改为本,咱们选择了第二种方案,使用mongo-connector来进行数据同步。然而用着用着咱们发现mongo-connector有一些问题:sql

  1. 有些数据须要关联查询,可是mongo-connector并不支持parent-child模型(其实有一个fork是支持的,但已经落后主分支一个版本,而且合进主分支的但愿渺茫)。
  2. mongo-connector支持断点续传,可是恢复速度很是缓慢。
  3. mongo-connector能够设置每次处理的文档数量,但坑爹的地方在于,到不了设置的数字,它始终不会写入。好比,MongoDB一个表只有100个文档,可是设置了batch的size为1000,因而那100个文档这辈子也同步不到Elasticsearch中了。
  4. mongo-connector不会限速,直接把Elasticsearch写炸了,但它不会管,接着写,并且中间丢掉的数据就算后面有oplog里面有update操做,也没办法恢复,会报出404错误。
  5. 在MongoDB里面存了一张meta表,在Elasticsearch里面也存了一个meta索引,里面存了大量的timestamp,直接使Elasticsearch文档总数翻倍。
(以上mongo-connector的缺点,若有诽谤,或许是咱们不会用,恳请斧正。)

因而咱们开始寻找更好用的工具,却发现没有好用的工具:mongodb

  1. Elasticsearch Rivers,曾经的官方同步工具。但该项目早已废弃。
  2. Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,而且项目进度缓慢。
  3. elasticsearch-hadoop,先导到hadoop,再导到Elasticsearch。高射炮打蚊子,绕一大圈,不经济。

没办法,只好本身用TypeScript写一个,取名为mongo-es。shell

现已开源至  github ,并发布到了  npm ,欢迎你们多多试用,多挑(ti)毛(xu)病(qiu)。 

mongo-es导入数据分为两个阶段:数据库

  1. Scan:扫描整个MongoDB的collection,每条文档都插入到Elasticsearch对应的index里面。使用Bulk API,进行批量写入。在扫描开始前记录当前的时间点,供第二阶段使用。
  2. Tail:从刚才记录的时间点,或一个指定的时间点开始,将MongoDB的oplog在Elasticsearch上进行replay。使用RxJS的bufferWithTimeOrCount函数,既能批量写入,又能保证同步延迟不会很长(通常是一秒左右)。

mongo-es比mongo-connector进步的地方有:npm

  1. 支持parent-child relationship,能够处理须要join的数据。
  2. 能够逆序Scan,先导入最新的数据,这对于出错后重建索引快速恢复很是有用。
  3. 无需在两边存储多余元数据,只记录oplog的timestamp。只要程序挂的时间不太长,oplog里面还有这个timestamp,就能恢复。
  4. 遇到缺失文档自动恢复。当由于不可控因素(如网络缘由),致使某个本应已经同步了的文档在Elasticsearch中不存在。这时若是oplog里面遇到一个对该文档的update操做,mongo-connector没法处理,打印出404错误。遇到这种状况时,mongo-es会回到MongoDB中,读取到这个文档,进行更新。
  5. 有限速功能,可以限制每秒钟读取的文档数量,避免把Elasticsearch压垮。

固然了,mongo-connector是一个更加通用的程序,能够把文档导到更多的地方。mongo-es只是把MongoDB的数据导入到Elasticsearch中,这样比较未免有些不公平,但就在MongoDB到Elasticsearch这个使用场景下,仍是好用很多的。

\开发过程当中踩过的坑:

  1. Scan阶段使用stream,方便控制读取速度。Tail阶段使用cursor,配合noCursorTimeout参数,避免长时间没有oplog时的超时错误。Tail阶段若是用stream,即便是设置了noCursorTimeout,超时了也会报错。
  2. 对于操做是update的oplog,oplog里面有多是一个完整的文档,这时候直接就能够写入。也有多是$set或$unset操做,这时候要去Elasticsearch里面取到旧的,完整的文档,在内存里执行update后再写入回去。最好不要直接读MongoDB,以减小MongoDB负担。
  3. 在内存中执行update时,也要检查变化的字段是否属于咱们须要的字段。若是变化的都不是须要的字段,能够忽略此次update操做,若是变化的字段不在咱们须要的范围内,则应排除,以减小写入次数。
  4. 有_parent的文档是不能直接用_id访问到的,由于它的routing是_parent,必须指定_parent的值才行。对于操做是update的oplog,咱们只能拿到_id,拿不到_parent对应的字段,因此这时要用es.search代替es.get,访问每一个分片,才能拿到文档。
  5. Timestamp在js代码里表示时low在前,high在后。在mongo shell里面是反过来的。
  6. Bulk API传入的body长度不能为0,遇到0的状况要跳过,不然会报错。

 

=============================================>>>>>>>>>> 

=====================================》》》》》》》》》》》

 

mongo-connector实现MongoDB与elasticsearch实时同步深刻详解

http://blog.csdn.net/laoyang360/article/details/51842822 

 

 

引言:

 

验证代表:mongo-connector工具支持MongoDB与ES之间的实时增insert、删delete、改update操做。 
对于历史数据,mongo-connector工具不能同步到ES中,根因是自己工具不支持(初步界定),仍是没有这种场景,待查(进一步研究后再更新)。

1. mongo-connector 地址:

https://github.com/mongodb-labs/mongo-connector

二、 mongo-connector 工具简介

mongo-connector工具建立一个从MongoDB簇到一个或多个目标系统的管道,目标系统包括:Solr,Elasticsearch,或MongoDB簇。 
该工具在MongoDB与目标系统间同步数据,并跟踪MongoDB的oplog,保持操做与MongoDB的实时同步。 
该工具已经在python2.6,2.7,3.3+下进行验证。 
mongo-connector工具是基于python开发的实时同步服务工具。它要求mongo运行在replica-set模式,且须要 elastic2_doc_manager将数据写入ES。 
这里写图片描述

三、 elastic2-doc-manager 工具简介

这是Elastic2.x版本的文档管理器。对应Elastic1.x版本须要使用 elastic-doc-manager。

四、ES与MongoDB同步步骤:

(1)安装 mongo-connector。

pip install mongo-connector

坑:用上面命令在Company内网可能会出现以下错误信息,

 

Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by 'ConnectTimeoutError(<pip._vendor.requests.packages.urllib3.connection.HTTPConnection object at 0x03742DF0>, 'Connection to mirrors.aliyun.com timed out. (connect timeout=15)')': /pypi/simple/mongo-connector/ Could not find a version that satisfies the requirement mongo-connector[elastic5] (from versions: ) No matching distribution found for mongo-connector[elastic5]
经调查后,须要配置pip的代理和镜像(若是install网速特别慢的话)

 

注:pip为安装python后能够用到的命令 <

xxx.xxx.x.xx为内网代理ip

 

 

pip install --proxy http://xxx.xxx.x.xx:8000 --index http://mirrors.aliyun.com/pypi/simple/ mongo-connector[elastic5] --trusted-host mirrors.aliyun.com

参考文档

mongo-connector 2.5.1:https://pypi.python.org/pypi/mongo-connector/

<<

mongodb-labs/mongo-connector / Usage with Elasticsearch:

https://github.com/mongodb-labs/mongo-connector/wiki/Usage-with-Elasticsearch#installation

mongodb-labs/elastic2-doc-manager:

https://github.com/mongodb-labs/elastic2-doc-manager

>>

python pip设置代理:http://blog.csdn.net/dangerousroy/article/details/52924116

Python pip 国内镜像大全及使用办法:http://blog.csdn.net/testcs_dn/article/details/54374849

 

(2)安装 elastic2-doc-manager。

pip install elastic2-doc-manager

注意: 
若是不安装(2)直接进入(3)、(4)则会报错:

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log. Exception in thread Thread-1Traceback (most recent call last)File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner self.run()

(3)mongo端启动

MongoDB 必须开启复制集,若是已经开启请忽略这一步:

如需开启复制集设置,参考以下步骤

Windows搭建MongoDB分片以及复制集:

http://blog.csdn.net/liangxw1/article/details/78031293

 

  • 【验证】初始化副本集的配置
  • 28
  • 29
rs0:PRIMARY> rs.status() { "set" : "rs0", "date" : ISODate("2016-07-05T08:50:55.272Z"), "myState" : 1, "term" : NumberLong(1), "heartbeatIntervalMillis" : NumberLong(2000), "members" : [ { "_id" : 0, "name" : "b48eafd69929:27017", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 115, "optime" : { "ts" : Timestamp(1467708606, 1), "t" : NumberLong(1) }, "optimeDate" : ISODate("2016-07-05T08:50:06Z"), "infoMessage" : "could not find member to sync from", "electionTime" : Timestamp(1467708605, 2), "electionDate" : ISODate("2016-07-05T08:50:05Z"), "configVersion" : 1, "self" : true } ], "ok" : 1 }

(4)ES端同步操做

  • 1
  • 2
[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log.

参数含义: 
-m: mongodb的地址与端口,端口默认为27017。 
-t:ES的地址与端口,端口默认为9200。 
-d:doc manager的名称,2.x版本为: elastic2-doc-manager。

五、ES与MongoDB Insert插入操做的同步验证

(1)Mongo端插入数据操做:

#Mongo建立数据库(对应ES的Index) rs0:PRIMARY> use zhang_index switched to db zhang_index #Mongo中插入数据(其中col_02对应ES中的Type) rs0:PRIMARY> db.col_02.insert({name:"laoluo", birth:"1964-03-21", sex:"man", company:"chuizi"}); WriteResult({ "nInserted" : 1 }) rs0:PRIMARY> db.col_02.insert({name:"renzhengfei", birth:"1954-03-21", sex:"man", company:"huawei"});

(2)Es端检索验证

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 4, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 2, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d8ceb8e3dc2d1db12a9", "_score" 1.0, "_source" : { "company" "huawei", "name" "renzhengfei", "birth" "1954-03-21", "sex" "man" } }, { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "laoluo", "birth" "1964-03-21", "sex" "man" } } ] } }

六、 ES与MongoDB Update更新操做的同步验证

(1)MongoDB的更新update操做

rs0:PRIMARY> db.col_02.update({'name':'laoluo'}, {$set:{'name':'luoyonghao'}}) WriteResult({ "nMatched" 1, "nUpserted" 0, "nModified" 1 }) rs0:PRIMARY> rs0:PRIMARY> db.col_02.find().pretty() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" } { "_id" ObjectId("577b7d8ceb8e3dc2d1db12a9"), "name" "renzhengfei", "birth" "1954-03-21", "sex" "man", "company" "huawei" }

(2)Es端检索更新后结果

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 1, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 2, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d8ceb8e3dc2d1db12a9", "_score" 1.0, "_source" : { "company" "huawei", "name" "renzhengfei", "birth" "1954-03-21", "sex" "man" } }, { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "luoyonghao", "birth" "1964-03-21", "sex" "man" } } ] } }

七、 ES与MongoDB delete删除操做的同步验证

(1) MongoDB的删除delete操做

rs0:PRIMARY> db.col_02.remove({'name':'renzhengfei'}) WriteResult({ "nRemoved" 1 }) rs0:PRIMARY> db.col_02.find() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" } rs0:PRIMARY> db.col_02.find().pretty() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" }

(2)ES端检索删除后结果

结果代表,MongoDB删除的内容,ES端已经同步删除。

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 2, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 1, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "luoyonghao", "birth" "1964-03-21", "sex" "man" } } ] } }

这里写图片描述

参见详细介绍:

https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

Mongo与ES同步的5种方式:

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao

常见Bug:

How to setup a MongoDB replica set for the connector? 
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

 

使用Mongo Connector和Elasticsearch实现模糊匹配 http://www.csdn.net/article/2014-09-02/2821485-how-to-perform-fuzzy-matching-with-mongo-connector? 

相关文章
相关标签/搜索