MongoDB 在单文档操做中具备原子性,在多文档操做中就再也不具备此特性,一般须要借助事务来实现 ACID 特性。html
客户端对于事务的操做,都由 MongoDB Client Driver 实现提供相应的 API 接口。MongoDB 4.0 以后才支持事务,对于客户端驱动版本也要选择相对应版本。node
本文采用 MongoDB Client Driver 3.5 版本git
Session 是 MongoDB 3.6 以后引入的概念,在之前的版本中,Mongod 进程中的每个请求会建立一个上下文(OperationContext),能够理解为一个单行事务,这个单行事务中对于数据、索引、oplog 的修改都是原子性的。github
MongoDB 3.6 以后的 Session 本质上也是一个上下文,在这个 Session 会话中多个请求共享一个上下文,为多文档事务实现提供了基础。mongodb
一个知识点:为什么 db.coll.count() 在宕机崩溃后常常就不许了?api
缘由在于 表记录数的更新独立于数据更新的事务以外,参考文章 mongoing.com/archives/54…。bash
开启一个新的事务,以后便可进行 CRUD 操做。session
提交事务保存数据,在提交以前事务中的变动的数据对外是不可见的。dom
事务回滚,例如,一部分数据更新失败,对已修改过的数据也进行回滚。async
结束本次会话。
var session = db.getMongo().startSession();
session.startTransaction({readConcern: { level: 'majority' },writeConcern: { w: 'majority' }});
var coll = session.getDatabase('test').getCollection('user');
coll.update({name: 'Jack'}, {$set: {age: 18}})
// 成功提交事务
session.commitTransaction();
// 失败事务回滚
session.abortTransaction();
复制代码
为了更好的理解 MongoDB 事务在 Node.js 中如何应用,列举一个例子进行说明。
假设咱们如今有这样一个商城商品下单场景,分为一个商品表(存储商品数据、库存信息),另外一个订单表(存储订单记录)。每次下单以前须要先校验库存是否大于 0,大于 0 的时候扣减商品库存、建立订单,不然,提示库存不足没法下单。
// goods
{
"_id": ObjectId("5e3b839ec2d95bfeecaad6b8"),
"goodId":"g1000", // 商品 Id
"name":"测试商品1", // 商品名称
"stock":2, // 商品库存
"price":100 // 商品金额
}
// db.goods.insert({ "goodId" : "g1000", "name" : "测试商品1", "stock" : 2, "price" : 100 })
复制代码
// order_goods
{
"_id":ObjectId("5e3b8401c2d95bfeecaad6b9"),
"id":"o10000", // 订单id
"goodId":"g1000", // 订单对应的商品 Id
"price":100 // 订单金额
}
// db.order_goods.insert({ id: "o10000", goodId: "g1000", price: 100 })
复制代码
注意:在一个事务操做中 readPreference 必须设置为 primary 节点,不能是 secondary 节点。
db.js
连接 MongoDB,初始化一个实例。
const MongoClient = require('mongodb').MongoClient;
const dbConnectionUrl = 'mongodb://192.168.6.131:27017,192.168.6.131:27018,192.168.6.131:27019/?replicaSet=May&readPreference=secondaryPreferred';
const client = new MongoClient(dbConnectionUrl, {
useUnifiedTopology: true,
});
let instance = null;
module.exports = {
dbInstance: async () => {
if (instance) {
return instance;
}
try {
instance = await client.connect();
} catch(err) {
console.log(`[MongoDB connection] ERROR: ${err}`);
throw err;
}
process.on('exit', () => {
instance.close();
});
return instance;
}
};
复制代码
index.js
const db = require('./db');
const testTransaction = async (goodId) => {
const client = await db.dbInstance();
const transactionOptions = {
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' },
readPreference: 'primary',
};
const session = client.startSession();
console.log('事务状态:', session.transaction.state);
try {
session.startTransaction(transactionOptions);
console.log('事务状态:', session.transaction.state);
const goodsColl = await client.db('test').collection('goods');
const orderGoodsColl = await client.db('test').collection('order_goods');
const { stock, price } = await goodsColl.findOne({ goodId }, { session });
console.log('事务状态:', session.transaction.state);
if (stock <= 0) {
throw new Error('库存不足');
}
await goodsColl.updateOne({ goodId }, {
$inc: { stock: -1 } // 库存减 1
})
await orderGoodsColl.insertOne({ id: Math.floor(Math.random() * 1000), goodId, price }, { session });
await session.commitTransaction();
} catch(err) {
console.log(`[MongoDB transaction] ERROR: ${err}`);
await session.abortTransaction();
} finally {
await session.endSession();
console.log('事务状态:', session.transaction.state);
}
}
testTransaction('g1000')
复制代码
运行测试
每一次事务函数执行以后,查看当前事务状态。
node index
事务状态: NO_TRANSACTION
事务状态: STARTING_TRANSACTION
事务状态: TRANSACTION_IN_PROGRESS
事务状态: TRANSACTION_COMMITTED
复制代码