MongoDB 4.2 内核解析 – Change Stream

MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上作了不少加强),用于订阅 MongoDB 内部的修改操做,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也能够将 MongoDB 的增量订阅应用到其余的关联系统;好比电商场景里,MongoDB 里存储新的订单信息,业务须要根据新增的订单信息去通知库存管理系统发货。mongodb

Change Stream 与 Tailing Oplog 对比

在 change stream 功能以前,若是要获取 MongoDB 增量的修改,能够经过不断 tailing oplog 的方式来 拉取增量的 oplog ,而后针对拉取到的 oplog 集合,来过滤知足条件的 oplog。这种方式也能知足绝大部分场景的需求,但存在以下的不足。shell

  • 使用门槛较高,用户须要针对 oplog 集合,打开特殊选项的的 tailable cursor (“tailable”: true, “awaitData” : true)。
  • 用户须要本身管理增量续传,当拉取应用 crash 时,用户须要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
  • 结果过滤必须在拉取侧完成,但只须要订阅部分 oplog 时,好比针对某个 DB、某个 Collection、或某种类型的操做,必需要把左右的 oplog 拉取到再进行过滤。
  • 对于 update 操做,oplog 只包含操做的部份内容,好比 {$set: {x: 1}} ,而应用常常须要获取到完整的文档内容。
  • 不支持 Sharded Cluster 的订阅,用户必须针对每一个 shard 进行 tailing oplog,而且这个过程当中不能有 moveChunk 操做,不然结果可能乱序。

MongoDB Change Stream 解决了 Tailing oplog 存在的不足微信

  • 简单易用,提供统一的 Change Stream API,一次 API 调用,便可从 MongoDB Server 侧获取增量修改。
  • 统一的进度管理,经过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,便可从上次的位置接着订阅。
  • 支持对结果在 Server 端进行 pipeline 过滤,减小网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
  • 支持 fullDocument: “updateLookup” 选项,对于 update,返回当时对应文档的完整内容。
  • 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,便可获取集群维度全局有序的修改。

Change Stream 实战

以 Mongo shell 为例,使用 Change Stream 很是简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操做。网络

`db.getMongo().watch() 订阅整个实例的修改
db.watch() 订阅指定DB的修改
db.collection.watch() 订阅指定Collection的修改`app

新建链接1发起订阅操做oop

mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000}) 最多阻塞等待 1分钟性能

新建链接2写入新数据ui

`mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })`编码

链接1上收到 Change Stream 更新code

`mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }`

上述 ChangeStream 结果里,_id 字段的内容即为 resume token,标识着 oplog 的某个位置,若是想从某个位置继续订阅,在 watch 时,经过 resumeAfter 指定便可。好比每一个应用订阅了上述3条修改,但只有第一条已经成功消费了,下次订阅时指定第一条的 resume token 便可再次订阅到接下来的2条。

`mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }`

Change Stream 内部实现

watch() wrapper

db.watch() 其实是一个 API wrapper,实际上 Change Stream 在 MongoDB 内部其实是一个 aggregation 命令,只是加了一个特殊的 $changestream 阶段,在发起 change stream 订阅操做后,可经过 db.currentOp() 看到对应的 aggregation/getMore 操做的详细参数。

`{

"op" : "getmore",
  "ns" : "test.coll",
  "command" : {
    "getMore" : NumberLong("233479991942333714"),
    "collection" : "coll",
    "maxTimeMS" : 50000,
    "lsid" : {
      "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
    },
  },
  "planSummary" : "COLLSCAN",
  "cursor" : {
    "cursorId" : NumberLong("233479991942333714"),
    "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
    "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
    "nDocsReturned" : NumberLong(1),
    "nBatchesReturned" : NumberLong(1),
    "noCursorTimeout" : false,
    "tailable" : true,
    "awaitData" : true,
    "originatingCommand" : {
      "aggregate" : "coll",
      "pipeline" : [
        {
          "$changeStream" : {
            "fullDocument" : "default"
          }
        }
      ],
      "cursor" : {

      },
      "lsid" : {
        "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
      },
      "$clusterTime" : {
        "clusterTime" : Timestamp(1577774144, 1),
        "signature" : {
          "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
          "keyId" : NumberLong(0)
        }
      },
      "$db" : "test"
    },
    "operationUsingCursorId" : NumberLong(7019500)
  },
  "numYields" : 2,
  "locks" : {

  }
}

`

resume token

resume token 用来描述一个订阅点,本质上是 oplog 信息的一个封装,包含 clusterTime、uuid、documentKey等信息,当订阅 API 带上 resume token 时,MongoDB Server 会将 token 转换为对应的信息,并定位到 oplog 起点继续订阅操做。

`struct ResumeTokenData {

Timestamp clusterTime;
int version = 0;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional uuid;

};`

ResumeTokenData 结构里包含 version 信息,在 4.0.7 之前的版本,version 均为0; 4.0.7 引入了一种新的 resume token 格式,version 为 1; 另外在 3.6 版本里,Resume Token 的编码与 4.0 也有所不一样;因此在版本升级后,有可能出现不一样版本 token 没法识别的问题,因此尽可能要让 MongoDB Server 全部组件(Replica Set 各个成员,ConfigServer、Mongos)都保持相同的内核版本。

更详细的信息,参考 https://docs.mongodb.com/manu...

updateLookup

Change Stream 支持针对 update 操做,获取当前的文档完整内容,而不是仅更新操做自己,好比

`mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })`

上面的 update 操做,默认状况下,change stream 会收到 {_id: 101}, {$set: {age: 20} 的内容,而并不会包含这个文档其余未更新字段的信息;而加上 fullDocument: “updateLookup” 选项后,Change Stream 会根据文档 _id 去查找文档当前的内容并返回。

须要注意的是,updateLookup 选项只能保证最终一致性,好比针对上述文档,若是连续更新100次,update 的 change stream 并不会按顺序收到中间每一次的更新,由于每次都是去查找文档当前的内容,而当前的内容可能已经被后续的修改覆盖。

Sharded cluster

Change Stream 支持针对 sharded cluster 进行订阅,会保证全局有序的返回结果;为了达到全局有序这个目标,mongos 须要从每一个 shard 都返回订阅结果按时间戳进行排序合并返回。

在极端状况下,若是某些 shard 写入量不多或者没有写入,change stream 的返回延时会受到影响,由于须要等到全部 shard 都返回订阅结果;默认状况下,mongod server 每10s会产生一条 Noop 的特殊oplog,这个机制会间接驱动 sharded cluster 在写入量不高的状况下也能持续运转下去。

因为须要全局排序,在 sharded cluster 写入量很高时,Change Stream 的性能极可能跟不上;若是对性能要求很是高,能够考虑关闭 Balancer,在每一个 shard 上各自创建 Change Stream。

参考资料
Change Stream Manual
Change Streams Production Recommendations
Tailable Cursors/

本文由MongoDB中文社区(mongoing.com)小英 (微信ID:mongoingcom)经过博客一文多发平台 [OpenWrite]发布!
相关文章
相关标签/搜索