MongoDB oplog (相似于 MySQL binlog) 记录数据库的全部修改操做,除了用于主备同步;oplog 还能玩出不少花样,好比git
总的来讲,MongoDB 能够经过 oplog 来跟生态对接,来实现数据的同步、迁移、恢复等能力。而在构建这些能力的时候,有一个通用的需求,就是工具或者应用须要有不断拉取 oplog 的能力;这个过程一般是github
那么问题来了,因为 MongoDB oplog 自己没有索引的,每次定位 oplog 的起点都须要进行全表扫描么?mongodb
{ "ts" : Timestamp(1563950955, 2), "t" : NumberLong(1), "h" : NumberLong("-5936505825938726695"), "v" : 2, "op" : "i", "ns" : "test.coll", "ui" : UUID("020b51b7-15c2-4525-9c35-cd50f4db100d"), "wall" : ISODate("2019-07-24T06:49:15.903Z"), "o" : { "_id" : ObjectId("5d37ff6b204906ac17e28740"), "x" : 0 } } { "ts" : Timestamp(1563950955, 3), "t" : NumberLong(1), "h" : NumberLong("-1206874032147642463"), "v" : 2, "op" : "i", "ns" : "test.coll", "ui" : UUID("020b51b7-15c2-4525-9c35-cd50f4db100d"), "wall" : ISODate("2019-07-24T06:49:15.903Z"), "o" : { "_id" : ObjectId("5d37ff6b204906ac17e28741"), "x" : 1 } } { "ts" : Timestamp(1563950955, 4), "t" : NumberLong(1), "h" : NumberLong("1059466947856398068"), "v" : 2, "op" : "i", "ns" : "test.coll", "ui" : UUID("020b51b7-15c2-4525-9c35-cd50f4db100d"), "wall" : ISODate("2019-07-24T06:49:15.913Z"), "o" : { "_id" : ObjectId("5d37ff6b204906ac17e28742"), "x" : 2 } }
上面是 MongoDB oplog 的示例,oplog MongoDB 也是一个集合,但与普通集合不同数据库
咱们在拉取 oplog 时,第一次从头开始拉取,而后每次拉取使用完,会记录最后一条 oplog 的ts字段;若是应用发生重启,这时须要根据上次拉取的 ts 字段,先找到拉取的起点,而后继续遍历。app
注:如下实现针对 WiredTiger 存储引擎,须要 MongoDB 3.0+ 版本才能支持工具
若是 MongoDB 底层使用的是 WiredTiger 存储引擎,在存储 oplog 时,实际上作过优化。MongoDB 会将 ts 字段做为 key,oplog 的内容做为 value,将key-value 存储到 WiredTiger 引擎里,WiredTiger 默认配置使用 btree 存储,因此 oplog 的数据在 WT 里实际上也是按 ts 字段顺序存储的,既然是顺序存储,那就有二分查找优化的空间。优化
MongoDB find 命令提供了一个选项,专门用于优化 oplog 定位。ui
大体意思是,若是你find的集合是oplog,查找条件是针对 ts 字段的 gte
、gt
、eq
,那么 MongoDB 字段会进行优化,经过二分查找快速定位到起点; 备节点同步拉取oplog时,实际上就带了这个选项,这样备节点每次重启,都能根据上次同步的位点,快速找到同步起点,而后持续保持同步。阿里云
因为咨询问题的同窗对内部实现感兴趣,这里简单的把重点列出来,要深入理解,仍是得深刻撸细节。url
// src/monogo/db/query/get_executor.cpp StatusWith<unique_ptr<PlanExecutor>> getExecutorFind(OperationContext* txn, Collection* collection, const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, PlanExecutor::YieldPolicy yieldPolicy) { // 构建 find 执行计划时,若是发现有 oplogReplay 选项,则走优化路径 if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) { return getOplogStartHack(txn, collection, std::move(canonicalQuery)); } ... return getExecutor( txn, collection, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, options); }
StatusWith<unique_ptr<PlanExecutor>> getOplogStartHack(OperationContext* txn, Collection* collection, unique_ptr<CanonicalQuery> cq) { // See if the RecordStore supports the oplogStartHack // 若是底层引擎支持(WT支持,mmapv1不支持),根据查询的ts,找到 startLoc const BSONElement tsElem = extractOplogTsOptime(tsExpr); if (tsElem.type() == bsonTimestamp) { StatusWith<RecordId> goal = oploghack::keyForOptime(tsElem.timestamp()); if (goal.isOK()) { // 最终调用 src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp::oplogStartHack startLoc = collection->getRecordStore()->oplogStartHack(txn, goal.getValue()); } } // Build our collection scan... // 构建全表扫描参数时,带上 startLoc,真正执行是会快速定位到这个点 CollectionScanParams params; params.collection = collection; params.start = *startLoc; params.direction = CollectionScanParams::FORWARD; params.tailable = cq->getQueryRequest().isTailable(); }
本文做者:张友东
本文为云栖社区原创内容,未经容许不得转载。