MongoDB 4.2 内核解析 - Change Stream

MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上作了不少加强),用于订阅 MongoDB 内部的修改操做,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也能够将 MongoDB 的增量订阅应用到其余的关联系统;好比电商场景里,MongoDB 里存储新的订单信息,业务须要根据新增的订单信息去通知库存管理系统发货。mongodb

Change Stream 与 Tailing Oplog 对比

在 change stream 功能以前,若是要获取 MongoDB 增量的修改,能够经过不断 tailing oplog  的方式来 拉取增量的 oplog ,而后针对拉取到的 oplog 集合,来过滤知足条件的 oplog。这种方式也能知足绝大部分场景的需求,但存在以下的不足。shell

  1. 使用门槛较高,用户须要针对 oplog 集合,打开特殊选项的的 tailable cursor  ("tailable": true, "awaitData" : true)。
  2. 用户须要本身管理增量续传,当拉取应用 crash 时,用户须要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
  3. 结果过滤必须在拉取侧完成,但只须要订阅部分 oplog 时,好比针对某个 DB、某个 Collection、或某种类型的操做,必需要把左右的 oplog 拉取到再进行过滤。
  4. 对于 update 操做,oplog 只包含操做的部份内容,好比 {$set: {x: 1}} ,而应用常常须要获取到完整的文档内容。
  5. 不支持 Sharded Cluster 的订阅,用户必须针对每一个 shard 进行 tailing oplog,而且这个过程当中不能有 moveChunk 操做,不然结果可能乱序。

MongoDB Change Stream 解决了 Tailing oplog 存在的不足网络

  1. 简单易用,提供统一的 Change Stream API,一次 API 调用,便可从 MongoDB Server 侧获取增量修改。
  2. 统一的进度管理,经过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,便可从上次的位置接着订阅。
  3. 支持对结果在 Server 端进行 pipeline 过滤,减小网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
  4. 支持 fullDocument: "updateLookup" 选项,对于 update,返回当时对应文档的完整内容。
  5. 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,便可获取集群维度全局有序的修改。

Change Stream 实战

以 Mongo shell 为例,使用 Change Stream 很是简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操做。app

db.getMongo().watch()    订阅整个实例的修改
db.watch()               订阅指定DB的修改
db.collection.watch()    订阅指定Collection的修改

一、新建链接1发起订阅操做oop

mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分钟

二、新建链接2写入新数据post

mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })

三、链接1上收到 Change Stream 更新性能

mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

四、上述 ChangeStream 结果里,_id 字段的内容即为 resume token,标识着 oplog 的某个位置,若是想从某个位置继续订阅,在 watch 时,经过 resumeAfter 指定便可。好比每一个应用订阅了上述3条修改,但只有第一条已经成功消费了,下次订阅时指定第一条的 resume token 便可再次订阅到接下来的2条。ui

mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

Change Stream 内部实现

watch() wrapper

db.watch() 其实是一个 API wrapper,实际上 Change Stream 在 MongoDB 内部其实是一个 aggregation 命令,只是加了一个特殊的 $changestream  阶段,在发起 change stream 订阅操做后,可经过 db.currentOp() 看到对应的 aggregation/getMore 操做的详细参数。阿里云

{
      "op" : "getmore",
      "ns" : "test.coll",
      "command" : {
        "getMore" : NumberLong("233479991942333714"),
        "collection" : "coll",
        "maxTimeMS" : 50000,
        "lsid" : {
          "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
        },
      },
      "planSummary" : "COLLSCAN",
      "cursor" : {
        "cursorId" : NumberLong("233479991942333714"),
        "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
        "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
        "nDocsReturned" : NumberLong(1),
        "nBatchesReturned" : NumberLong(1),
        "noCursorTimeout" : false,
        "tailable" : true,
        "awaitData" : true,
        "originatingCommand" : {
          "aggregate" : "coll",
          "pipeline" : [
            {
              "$changeStream" : {
                "fullDocument" : "default"
              }
            }
          ],
          "cursor" : {

          },
          "lsid" : {
            "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
          },
          "$clusterTime" : {
            "clusterTime" : Timestamp(1577774144, 1),
            "signature" : {
              "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
              "keyId" : NumberLong(0)
            }
          },
          "$db" : "test"
        },
        "operationUsingCursorId" : NumberLong(7019500)
      },
      "numYields" : 2,
      "locks" : {

      }
    }

resume token

resume token 用来描述一个订阅点,本质上是 oplog 信息的一个封装,包含 clusterTime、uuid、documentKey等信息,当订阅 API 带上 resume token 时,MongoDB Server 会将 token 转换为对应的信息,并定位到 oplog 起点继续订阅操做。编码

struct ResumeTokenData {
    Timestamp clusterTime;
    int version = 0;
    size_t applyOpsIndex = 0;
    Value documentKey;
    boost::optional<UUID> uuid;
};

ResumeTokenData 结构里包含 version 信息,在 4.0.7 之前的版本,version 均为0; 4.0.7 引入了一种新的 resume token 格式,version 为 1; 另外在 3.6 版本里,Resume Token 的编码与 4.0 也有所不一样;因此在版本升级后,有可能出现不一样版本 token 没法识别的问题,因此尽可能要让 MongoDB Server 全部组件(Replica Set 各个成员,ConfigServer、Mongos)都保持相同的内核版本。

更详细的信息,参考 https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability

updateLookup

Change Stream 支持针对 update 操做,获取当前的文档完整内容,而不是仅更新操做自己,好比

mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

上面的 update 操做,默认状况下,change stream 会收到  {_id: 101}, {$set: {age: 20}  的内容,而并不会包含这个文档其余未更新字段的信息;而加上 fullDocument: "updateLookup" 选项后,Change Stream 会根据文档 _id 去查找文档当前的内容并返回。

须要注意的是,updateLookup 选项只能保证最终一致性,好比针对上述文档,若是连续更新100次,update 的 change stream 并不会按顺序收到中间每一次的更新,由于每次都是去查找文档当前的内容,而当前的内容可能已经被后续的修改覆盖。

Sharded cluster

Change Stream 支持针对 sharded cluster 进行订阅,会保证全局有序的返回结果;为了达到全局有序这个目标,mongos 须要从每一个 shard 都返回订阅结果按时间戳进行排序合并返回。

在极端状况下,若是某些 shard 写入量不多或者没有写入,change stream 的返回延时会受到影响,由于须要等到全部 shard 都返回订阅结果;默认状况下,mongod server 每10s会产生一条 Noop 的特殊oplog,这个机制会间接驱动 sharded cluster 在写入量不高的状况下也能持续运转下去。

因为须要全局排序,在 sharded cluster 写入量很高时,Change Stream 的性能极可能跟不上;若是对性能要求很是高,能够考虑关闭 Balancer,在每一个 shard 上各自创建 Change Stream。

参考资料


本文做者:张友东

阅读原文

本文为阿里云内容,未经容许不得转载。

相关文章
相关标签/搜索