最近,须要使用 Python 对 MongodB 作一些简单的操做,不想使用各类繁重的框架。出于可重用性的考虑,想对 MongoDB Python 官方驱动 PyMongo 作下简单封装,百度一如既往的未能给我一个满意的结果,因而有了下文。html
【正文】python
PyMongo,MongoDB Python官方驱动git
- docs: https://api.mongodb.com/python/current/index.html
- github: https://github.com/mongodb/mongo-python-driver
PyMongo 驱动几乎支持 MongoDB 的所有特性,能够链接单个的 MongoDB 数据库、副本集和分片集群。从提供的API角度来看,pymongo package是其核心,包含对数据库的各类操做。本文将介绍一个简单封装类 DBManager。主要特性:对数据库和集合的操做确保其存在性;支持PyMongo的原生操做,包括基本的CRUD操做、批量操做、MapReduce、多线程和多进程等;支持因果一致性会话和事务的流水线操做,并给出简单示例。github
mongo_client 提供了链接 MongoDB 的MongoClient类:
class pymongo.mongo_client.MongoClient(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
每一个 MongoClient 实例 client (下文简称 client)都维护一个内建的链接池,默认 maxPoolsize 大小100。对于多线程的操做,链接池会给每个线程一个 socket 链接,直到达到最大的链接数,后续的线程会阻塞以等待有可用的链接被释放。client 对 MongoDB 拓扑结构中的每一个server 还维护一个额外的链接来监听 server 的状态。mongodb
下面的 new_mongo_client
函数用于获取一个数据库链接的 client。其中,client.admin.command('ismaster')
用来检查 server 的可用状态,简单省事不须要认证。数据库
def new_mongo_client(uri, **kwargs): """Create new pymongo.mongo_client.MongoClient instance. DO NOT USE IT DIRECTLY.""" try: client = MongoClient(uri, maxPoolSize=1024, **kwargs) client.admin.command('ismaster') # The ismaster command is cheap and does not require auth. except ConnectionFailure: logging.error("new_mongo_client(): Server not available, Please check you uri: {}".format(uri)) return None else: return client
PyMongo 不是进程(fork-safe)安全的,但在一个进程中是线程安全(thread-safe)的。所以常见的场景是,对于一个MongoDB 环境,为每个进程中建立一个 client ,后面全部的数据库操做都使用这一个实例,包括多线程操做。永远不要为每一次操做都建立一个 MongoClient 实例,使用完后调用 MongoClient.close() 方法,这样没有必要并且会很是浪费性能。api
鉴于以上缘由,通常不宜直接使用new_mongo_client
函数获取 client,而是进一步封装为get_mongo_client
方法。 其中全局常量 URI_CLIENT_DICT
保持着数据库 URI 字符串与对应 clinet 的字典,一个 URI 对应一个 client 。代码以下:数组
MONGO_URI_DEFAULT = 'mongodb://localhost:27017/admin' URI_CLIENT_DICT = {} # a dictionary hold all client with uri as key def get_mongo_client(uri=MONGO_URI_DEFAULT, fork=False, **kwargs): """Get pymongo.mongo_client.MongoClient instance. One mongodb uri, one client. @:param uri: mongodb uri @:param fork: for fork-safe in multiprocess case, if fork=True, return a new MongoClient instance, default False. @:param kwargs: refer to pymongo.mongo_client.MongoClient kwargs """ if fork: return new_mongo_client(uri, **kwargs) global URI_CLIENT_DICT matched_client = URI_CLIENT_DICT.get(uri) if matched_client is None: # no matched client new_client = new_mongo_client(uri, **kwargs) if new_client is not None: URI_CLIENT_DICT[uri] = new_client return new_client return matched_client
PyMongo 有个特性:对于不存在的数据库、集合上的查询不会报错。以下,Ipython中演示在不存在xxDB 数据库和 xxCollection 集合上的操做:安全
In [1]: from pymongo import MongoClient In [2]: client = MongoClient() # default uri is 'mongodb://localhost:27017/admin' In [3]: db = client.get_database('xxDB') # Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB') In [4]: coll = db.get_collection('XXCollection') # Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB'), u'XXCollection') In [5]: coll.find_one() # note: no tip, no error, no exception, return None In [6]: coll.insert_one({'hello' : 'what a fucking feature'}) Out[6]: <pymongo.results.InsertOneResult at 0x524ccc8> In [7]: coll.find_one() Out[7]: {u'_id': ObjectId('5c31c807bb048515b814d719'), u'hello': u'what a fucking feature'}
这对于手误写错数据库或集合名字后进行的后续操做,简直就是灾难。鉴于此因,有必要对获取数据库或集合时加上确认保护。
下面对于获取数据库,使用 MongoClient.list_database_names() 获取全部的数据库名字,若是数据库名称不在其中,则返回None。一样的道理,对于集合使用 Database.list_collection_names()。注:因为用户权限问题形成的获取数据库或集合列表的操做报错的状况,默认不加确认保护。session
def get_existing_db(client, db_name): """Get existing pymongo.database.Database instance. @:param client: pymongo.mongo_client.MongoClient instance @:param db_name: database name wanted """ if client is None: logging.error('client {} is None'.format(client)) return None try: db_available_list = client.list_database_names() except PyMongoError as e: logging.error('client: {}, db_name: {}, client.list_database_names() error: {}'. format(client, db_name, repr(e))) else: if db_name not in db_available_list: logging.error('client {} has no db named {}'.format(client, db_name)) return None db = client.get_database(db_name) return db def get_existing_coll(db, coll_name): """Get existing pymongo.collection.Collection instance. @:param client: pymongo.mongo_client.MongoClient instance @:param coll_name: collection name wanted """ if db is None: logging.error('db {} is None'.format(db)) return None try: coll_available_list = db.list_collection_names() except PyMongoError as e: logging.error('db: {}, coll_name: {}, db.list_collection_names() error: {}'. format(db, coll_name, repr(e))) else: if coll_name not in coll_available_list: logging.error('db {} has no collection named {}'.format(db, coll_name)) return None coll = db.get_collection(coll_name) return coll
前文的冗长铺垫主要是为了引入这个 PyMongo 驱动封装类 DBManger。
DBManger 类的实例保持的状态有MongoClient实例 self.client
, 数据库self.db
和 集合self.coll
,并经过属性(property)对外开放。PyMongo 原生的方法对这里的 client, db 和 coll 一样适用。client 由类的构造器调用上文的get_mongo_client
方法获取,db 和 coll 便可经过类的构造器获取也可经过 self.db_name
和 self.coll_name
这些 setter 来切换。
DBManger 类的实例持有的方法 self.create_coll(self, db_name, coll_name)
, session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
。后两种方法在下一节再具体解释。
class DBManager: """A safe and simple pymongo packaging class ensuring existing database and collection. Operations: MongoClient level operations: https://api.mongodb.com/python/current/api/pymongo/mongo_client.html Database level operations: https://api.mongodb.com/python/current/api/pymongo/database.html Collection level operations: https://api.mongodb.com/python/current/api/pymongo/collection.html """ __default_uri = 'mongodb://localhost:27017/admin' __default_db_name = 'test' __default_coll_name = 'test' def __init__(self, uri=__default_uri, db_name=__default_db_name, coll_name=__default_coll_name, **kwargs): self.__uri = uri self.__db_name = db_name self.__coll_name = coll_name self.__client = get_mongo_client(uri, **kwargs) self.__db = get_existing_db(self.__client, db_name) self.__coll = get_existing_coll(self.__db, coll_name) def __str__(self): return u'uri: {}, db_name: {}, coll_name: {}, id_client: {}, client: {}, db: {}, coll: {}'.format( self.uri, self.db_name, self.coll_name, id(self.client), self.client, self.db, self.coll) @property def uri(self): return self.__uri @property def db_name(self): return self.__db_name @property def coll_name(self): return self.__coll_name @db_name.setter def db_name(self, db_name): self.__db_name = db_name self.__db = get_existing_db(self.__client, db_name) @coll_name.setter def coll_name(self, coll_name): self.__coll_name = coll_name self.__coll = get_existing_coll(self.__db, coll_name) @property def client(self): return self.__client @property def db(self): return self.__db @property def coll(self): # always use the current instance self.__db self.__coll = get_existing_coll(self.__db, self.__coll_name) return self.__coll def create_coll(self, db_name, coll_name): """Create new collection with new or existing database""" if self.__client is None: return None try: return self.__client.get_database(db_name).create_collection(coll_name) except CollectionInvalid: logging.error('collection {} already exists in database {}'.format(coll_name, db_name)) return None def session_pipeline(self, pipeline): if self.__client is None: logging.error('client is None in session_pipeline: {}'.format(self.__client)) return None with self.__client.start_session(causal_consistency=True) as session: result = [] for operation in pipeline: try: if operation.level == 'client': target = self.__client elif operation.level == 'db': target = self.__db elif operation.level == 'coll': target = self.__coll operation_name = operation.operation_name args = operation.args kwargs = operation.kwargs operator = getattr(target, operation_name) if type(args) == tuple: ops_rst = operator(*args, session=session, **kwargs) else: ops_rst = operator(args, session=session, **kwargs) if operation.callback is not None: operation.out = operation.callback(ops_rst) else: operation.out = ops_rst except Exception as e: logging.error('{} {} Exception, session_pipeline args: {}, kwargs: {}'.format( target, operation, args, kwargs)) logging.error('session_pipeline Exception: {}'.format(repr(e))) result.append(operation) return result # https://api.mongodb.com/python/current/api/pymongo/client_session.html#transactions def transaction_pipeline(self, pipeline): if self.__client is None: logging.error('client is None in transaction_pipeline: {}'.format(self.__client)) return None with self.__client.start_session(causal_consistency=True) as session: with session.start_transaction(): result = [] for operation in pipeline: try: if operation.level == 'client': target = self.__client elif operation.level == 'db': target = self.__db elif operation.level == 'coll': target = self.__coll operation_name = operation.operation_name args = operation.args kwargs = operation.kwargs operator = getattr(target, operation_name) if type(args) == tuple: ops_rst = operator(*args, session=session, **kwargs) else: ops_rst = operator(args, session=session, **kwargs) if operation.callback is not None: operation.out = operation.callback(ops_rst) else: operation.out = ops_rst except Exception as e: logging.error('{} {} Exception, transaction_pipeline args: {}, kwargs: {}'.format( target, operation, args, kwargs)) logging.error('transaction_pipeline Exception: {}'.format(repr(e))) raise Exception(repr(e)) result.append(operation) return result
这里给出一些例子来讲明 DBManager的使用方法。
# get DBManger instance var dbm = DBManager('mongodb://localhost:27017/admin') # db_name, coll_name default 'test' dbm.create_coll('testDB', 'testCollection') # change db or coll dbm.db_name = 'testDB' # dbm.db (test -> testDB) and dbm.coll (test.testCollection-> testDB.testCollection) will be changed at the same time dbm.coll_nmae = 'testCollection' # dbm.coll (test.test-> test.testCollection) will be change at the same time
# simple manipulation operation dbm.coll.insert_one({'hello': 'world'}) print(dbm.coll.find_one()) # {'_id': ObjectId('...'), 'hello': 'world'} dbm.coll.update_one({'hello': 'world'}, {'hello': 'hell'}) # bulk operation from pymongo import InsertOne, DeleteOne, ReplaceOne, ReplaceOne dbm.coll.bulk_write([InsertOne({'y':1}), DeleteOne({'x':1}), ReplaceOne({{'w':1}, {'z':1}, upsert=True})]) # simple managing operation import pymongo dbm.coll.create_index([('hello', pymongo.DESCENDING)], background=True) dbm.client.list_database_names() dbm.db.list_collection_names()
# thread concurrent import threading def fun(uri, db_name, coll_name): # new DBManager instance avoid variable competition dbm = DBManager(uri, db_name, coll_name) pass t = threading.Thread(target=func, args=(uri, db_name, coll_name)) t.start() # multiprocess parallel import multiprocessing def func(uri, db_name, coll_name): # new process, new client with fork=True parameter, and new DBManager instance. dbm = DBManager(uri, db_name, coll_name, fork=True) # Do something with db. pass proc = multiprocessing.Process(target=func, args=(uri, db_name, coll_name)) proc.start()
# MapReduce from bson.code import Code mapper = Code(''' function () {...} ''') reducer = Code(''' function (key, value) {...} ''') rst = dbm.coll.inline_map_reduce(mapper, reducer)
MongoDB Reference
- docs: https://docs.mongodb.com/manual/
- causal-consistency session: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
- transation: https://docs.mongodb.com/manual/core/transactions/#transactions
会话(session),是对数据库链接的一种逻辑表示。从MongoDB 3.6开始,MongoDB引入了客户端会话(client session),并在其中加入了对操做的因果一致性(causal-consistency)的支持。所以,更准确地说,这里 DBManger 类封装的实际上是因果一致性的会话,即client.start_session(causal_consistency=True)
。不过,一致性可以保证的前提是客户端的应用应保证在一个会话中只有一个线程(thread)在作这些操做。在一个客户端会话中,多个顺序的读写操做获得的结果与它们的执行顺序将是因果一致的,读写的设置都自动设为 "majority"。应用场景:先写后读,先读后写,一致性的写,一致性的读(Read your writes,Writes follow reads,Monotonic writes, Monotonic reads)。客户端会话与服务端会话(server session)进行交互。从3.6版本开始,MongoDB驱动将全部的操做都关联到服务端会话。服务端会话是客户端会话顺序操做因果一致性和重试写操做的得以支持的底层框架。
MongoDB 对单个文档的操做时是原子性的(atomic)。原子性是指一个操做的结果要么有要么没有,不可再切割,换句话说叫 “all or nothing”。从MongoDB 4.0开始,副本集(Replica set)开始支持多个文档级别的原子性,即多文档事务(muti-document transaction)。在同一个事务中,对跨越不一样数据库或集合下的多个文档操做,若是所有操做成功,则该事务被成功提交(commit);若是某些操做出现失败,则整个事务会终止(abort),操做中对数据库的改动会被丢弃。只有在事务被成功提交以后,操做的结果才能被事务外看到,事务正在进行或者事务失败,其中的操做对外都不可见。单个mongod服务和分片集群(sharded cluster)暂不支持事务。MongoDB官方预计在4.2版本左右对分片集群加入对事务的支持。另外,须要注意的是,多文档事务会引入更大的性能开销,在场景容许的状况下,尽量考虑用嵌套文档或数组的单文档操做方式来解决问题。
会话和事务的主要应用场景其实都是多个的时序性操做,即流水线形式。所以 DBManager 加入了session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的操做方法。首先引入表征操做的类Operation,描述一个操做做用的层次(client, db或coll)、操做方法、参数和操做结果须要调用的回调函数,见名知意,再也不赘解。多个操做 Operation 类的实例构成的list 为pipeline, 做为session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的输入参数。pipeline 操做的每一步的输出会写入到对应Operation 类的实例的out属性中。
class Operation: """Operation for constructing sequential pipeline. Only used in DBManager.session_pipeline() or transaction_pipeline(). Constructor parameters: level: <'client' | 'db' | 'coll'> indicating different operation level, MongoClient, Database, Collection operation_name: Literally, the name of operation on specific level args: position arguments the operation need. Require the first parameter or a tuple of parameters of the operation. kwargs: key word arguments the operation need. callback: callback function for operation result Examples: # pymongo.collection.Collection.find(filter, projection, skip=None, limit=None,...) Operation('coll', 'find', {'x': 5}) only filter parameter, equivalent to: Operation('coll', 'find', args={'x': 5}) or Operation('coll', 'find', kwargs={filter: {'x': 5}}) Operation('coll', 'find', ({'x': 5},{'_id': 0}) {'limit':100}), equivalent to: Operation('coll', 'find', args=({'x': 5},{'_id': 0}, None, {'limit':100}) ), OR Operation('coll', 'find', kwargs={'filter':{'x': 5}, 'projection': {'_id': 0},'limit':100}) def cursor_callback(cursor): return cursor.distinct('hello') Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback) """ def __init__(self, level, operation_name, args=(), kwargs={}, callback=None): self.level = level self.operation_name = operation_name self.args = args if kwargs is None: self.kwargs = None else: self.kwargs = kwargs self.callback = callback self.out = None
基于 DBManager 和 Operation 的因果一致性的会话和事务的简单示例以下:
# causal-consistency session or transaction pipeline operation def cursor_callback(cursor): return cursor.distinct('hello') op_1 = Operation('coll', 'insert_one', {'hello': 'heaven'}) op_2 = Operation('coll', 'insert_one', {'hello': 'hell'}) op_3 = Operation('coll', 'insert_one', {'hello': 'god'}) op_4 = Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback) op_5 = Operation('coll', 'find_one', {'hello': 'god'}) pipeline = [op_1, op_2, op_3, op_4, op_5] ops = dbm.transaction_pipeline(pipeline) # only on replica set deployment # ops = dbm.session_pipeline(pipeline) # can be standalone, replica set or sharded cluster. for op in ops: print(op.out)
【正文完】
注:内容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/85944967