在MongoDB3.6引入的新feature中,change stream无疑是很是吸引人的。html
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog.mongodb
Change stream容许应用实时获取mongodb数据的变动,这是个呼声很高的一个的需求,能够用于ETL、跨平台数据同步、通知服务等。之前没有change stream的时候,也能够经过tail oplog来追踪修改,但这是复杂、危险的野路子。shell
本文地址:http://www.javashuo.com/article/p-fnwcvuof-u.html数据库
Targeted changesexpress
Changes can be filtered to provide relevant and targeted changes to listening applications.网络
Resumablilityapp
Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. resume tokenide
Total orderingpost
MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster.性能
Durability
Change streams only include majority-committed changes.
Security
Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.
Ease of use
Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.
Idempotence
相比自动tail oplog,change stream 有如下优势:
Change stream对MongoDB的部署有一些需求:
change event包括:
有意思的是,相比CRUD,多了一个replace事件。update 与 replace的区别在于
测试方法:启动两个Mongo shell,一个操做数据库,一个watch。为了方便区分,浅绿色背景为Operate,灰色背景为Watch
Operate
MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering
Watch
MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering order 2 MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() assert: command failed: { "operationTime" : Timestamp(1533888296, 2), "ok" : 0, "errmsg" : "cannot open $changeStream for non-existent database: engineering", "code" : 26, "codeName" : "NamespaceNotFound", "$clusterTime" : { "clusterTime" : Timestamp(1533888296, 2), "signature" : { "hash" : BinData(0,"fWTN4Kuv7cq9xCcC0vCF4AkTxuU="), "keyId" : NumberLong("6563302068054917121") } } } : aggregate failed
从watch报错能够看出,只能对已经存在的db watch,所以能够先插入一条数据,建立对应的DB、Collection
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 })
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() 2018-08-10T16:08:49.200+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1
此时已经建立好用于监听的cursor,此时尚未change event。
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test2', age: 19, 'email':'test2@gmail.con'})
WriteResult({ "nInserted" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSC0AAAADRmRfaWQAZFttSCb45nBxa/FSsABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0"), "username" : "test2", "age" : 19, "email" : "test2@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0") } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {age: 19}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSSMAAAACRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "replace", "fullDocument" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af"), "age" : 19 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } }
能够看到,操做的时候使用的是db.collection.update,但change event 倒是replace,缘由在eplace-a-document-entirely中有介绍
If the
<update>
document contains onlyfield:value
expressions, then:
- The
update()
method replaces the matching document with the<update>
document. Theupdate()
method does not replace the_id
value. For an example, see Replace All Fields.update()
cannot update multiple documents.
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({ "_id" : ObjectId("5b6d47eaf8e670716bf152af")}) WriteResult({ "nRemoved" : 1 })
watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSfAAAAAFRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 19}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSmQAAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 18, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSn0AAAABRmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 19 }, "removedFields" : [ ] } }
db.collection.watch 能够设置选项fullDocument参数,这个在change event:update的时候就能够返回对用documents的完整信息。
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch([], {fullDocument:'updateLookup'} )
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 29}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttS88AAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 29, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 29 }, "removedFields" : [ ] } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({"username": "test3"}) WriteResult({ "nRemoved" : 2 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> ret = cursor.next() { "_id" : { "_data" : BinData(0,"gltusJ4AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } Mongo
Resume Watch
MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor = db.users.watch([], {"resumeAfter": ret['_id']}) { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor.next() 2018-08-11T17:49:13.127+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1
在resume_cursor中,resumeAfter的参数设置为了以前的watch document,在watch的时候会一次性返回已经被消费过的change event
在Designing Data-Intensive Applications一书中,有一节Change Data Capture(cdc),讲述得就是复制集(replica set)中replication log的使用,对于MongoDB, replication log其实就是oplog。书中提到:
The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API.
也就是说,应用(client)只能按照db的约束来使用db,而不是直接读取、解析replication log。但直接使用replic log直接用来建立serach index,cache,data warehouse。以下图所示:
change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.
CDC使得Search index, Data warehouse成为了派生数据系统(derived data systems),也能够理解为是DB数据的视图。另外,有意思的是,上图db、replication log、derived data system组成的系统看起来很像一个中心化复制集(single leader):DB是leader(Primary),derived data system(cache, data warehouse)是follower(Secondary)。
Change stream应用前景很是普遍,在 完美数据迁移-MongoDB Stream的应用 一文中,介绍了使用change stream来在服务化改造的时候作数据迁移,且给出了一个完整的示范。在USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS一文中,也结合NodeJs给出了一个简单的使用案列。
官方对在Sharded Cluster上使用change stream有一些说明,能够参考文档,有如下几点值得注意:
(1)
To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes.
无论有没有数据变动,mongos都须要在全部shards上check,影响了change steam的响应时间。若是网络延时大,如geographically distributed shard,问题会更明显。若是数据变动特别频繁,那么Change stream可能跟不上变化
(2)
For sharded collections, update operations with multi : true may cause any change streams opened against that collection to send notifications for orphaned documents.
对于update操做,若是设置 multi:True,那么操做也可能在 orphaned documents.上执行,这样也会产生多余的change stream,应用可能须要处理这种情侣。BTW,ofphaned document是很使人头疼的问题。
另外,MongoDB3.6只能针对单个collection进行watch,这样若是要关注多个collection或者多个db的write event时,须要分别创建链接进行watch,在 MongoDB 3.6 Change Streams: A Nest Temperature and Fan Control Use Case一文中提到这可能带来性能问题
It’s estimated that after 1000 streams you will start to see very measurable performance drops
本文介绍了MongoDB Change Stream这一新特性,以及其在具体应用中须要注意到的一些问题,并基于MongoDB atlas进行了简单的尝试。毫无疑问,Change Stream是很是有前途的特性,能解决不少如今实现起来很别扭的问题。可是若是要用于线上业务,还须要大量的测试,尤为是容错性与性能。
an-introduction-to-change-streams
免费试用MongoDB云数据库 (MongoDB Atlas)教程