让更多的人了解 阿里开源的MongoShake能够很好知足mongodb到kafka高性能高可用实时同步需求(项目地址:https://github.com/alibaba/MongoShake
,下载地址:https://github.com/alibaba/MongoShake/releases
)。至此博客就结束了,你能够愉快地啃这个项目了。仍是一块儿来看一下官方的描述:linux
MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,能够知足迁移和同步的需求,进一步实现灾备和多活功能。git
哈哈,有兴趣听我啰嗦的能够往下。最近,有个实时增量采集mongodb数据(数据量在天天10亿条左右
)的需求,须要先调研一下解决方案。我分别百度、google了mongodb kafka sync 同步 采集 实时
等 关键词,写这篇博客的时候排在最前面的当属kafka-connect
(官方有实现https://github.com/mongodb/mongo-kafka
,其实也有非官方的实现)那一套方案,我对kafka-connect相对熟悉一点(不熟悉的话估计编译部署都要花好一段时间),没测以前就感受可能不知足个人采集性能需求,测下来果真也是不知足需求。后来,也看到了https://github.com/rwynn/route81
,编译部署也较为麻烦,一样不知足采集性能需求。我搜索东西的时候通常状况下不会往下翻太多,没找到所需的,大多会尝试换关键词(包括中英文)搜搜,此次可能也提醒我下次要多往下找找,说不定有些好东西未必排在最前面几个
。github
以后在github上搜in:readme mongodb kafka sync
,让我眼前一亮。mongodb
点进去快速读了一下readme,正是我想要的(后面本身实际测下来确实高性能、高可用,知足个人需求),官方也提供了MongoShake的性能测试报告。json
这篇博客不讲(也很大多是笔者技术太渣,没法参透领会(●´ω`●)
)MongoShake的架构、原理、实现,如何高性能的,如何高可用的等等。就一个目的,但愿其余朋友在搜索实时同步mongodb数据
时候,MongoShake
的解决方案能够排在最前面(实力所归,谁用谁知道,独乐乐不如众乐乐,故做此博客
),避免走弯路、绕路。api
v2.2.1以前的MongoShake版本处理数据的流程:架构
MongoDB(数据源端,待同步的数据)
-->
MongoShake(对应的是collector.linux
进程,做用是采集)
-->
Kafka(raw格式,未解析的带有header+body的数据)
-->
receiver(对应的是receiver.linux
进程,做用是解析,这样下游组件就能拿到好比解析好的一条一条的json格式的数据)
-->
下游组件(拿到mongodb中的数据用于本身的业务处理)工具
v2.2.1以前MongoShake的版本解析入kafka,须要分别启collector.linux和receiver.linux进程,并且receiver.linux须要本身根据你的业务逻辑填充完整,而后编译出来,默认只是把解析出来的数据打个log而已
性能
src/mongoshake/receiver/replayer.go
中的代码如图:测试
详情见:https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct
v2.2.1版本MongoShake的collector.conf
有一个配置项tunnel.message
# the message format in the tunnel, used when tunnel is kafka. # "raw": batched raw data format which has good performance but encoded so that users # should parse it by receiver. # "json": single oplog format by json. # "bson": single oplog format by bson. # 通道数据的类型,只用于kafka和file通道类型。 # raw是默认的类型,其采用聚合的模式进行写入和 # 读取,可是因为携带了一些控制信息,因此须要专门用receiver进行解析。 # json以json的格式写入kafka,便于用户直接读取。 # bson以bson二进制的格式写入kafka。 tunnel.message = json
raw
格式,那么数据处理流程和上面以前的一致(MongoDB->MongoShake->Kafka->receiver->下游组件)json
、bson
,处理流程为MongoDB->MongoShake->Kafka->下游组件v2.2.1版本设置为json处理的优势就是把之前须要由receiver对接的格式,改成直接对接,从而少了一个receiver,也不须要用户额外开发,下降开源用户的使用成本。
简单总结一下就是:
raw格式可以最大程度的提升性能,可是须要用户有额外部署receiver的成本。json和bson格式可以下降用户部署成本,直接对接kafka便可消费,相对于raw来讲,带来的性能损耗对于大部分用户是可以接受的。
我用的是v2.2.1版本,高可用部署很是简单。collector.conf
开启master的选举便可:
# high availability option. # enable master election if set true. only one mongoshake can become master # and do sync, the others will wait and at most one of them become master once # previous master die. The master information stores in the `mongoshake` db in the source # database by default. # 若是开启主备mongoshake拉取同一个源端,此参数须要开启。 master_quorum = true # checkpoint存储的地址,database表示存储到MongoDB中,api表示提供http的接口写入checkpoint。 context.storage = database
同时我checkpoint的存储地址默认用的是database,会默认存储在mongoshake
这个db中。咱们能够查询到checkpoint记录的一些信息。
rs0:PRIMARY> use mongoshake switched to db mongoshake rs0:PRIMARY> show collections; ckpt_default ckpt_default_oplog election rs0:PRIMARY> db.election.find() { "_id" : ObjectId("5204af979955496907000001"), "pid" : 6545, "host" : "192.168.31.175", "heartbeat" : NumberLong(1582045562) }
我在192.168.31.174,192.168.31.175,192.168.31.176上总共启了3个MongoShake实例,能够看到如今工做的是192.168.31.175机器上进程。自测过程,高速往mongodb写入数据,手动kill掉192.168.31.175上的collector进程,等192.168.31.174成为master以后,我又手动kill掉它,最终只保留192.168.31.176上的进程工做,最后统计数据发现,有重采数据现象,猜想有实例还没来得及checkpoint就被kill掉了。