Kombu源码分析(一)概述

Celery是Python中最流行的异步消息队列框架,支持RabbitMQ、Redis、ZoopKeeper等做为Broker,而对这些消息队列的抽象,都是经过Kombu实现的。Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。python

AMQP中的各类概念,Message、Producer、Exchange、Queue、Consumer、Connection、Channel在Kombu中都相应作了实现,另外Kombu还实现了Transport,就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis仍是其它实现的。git

  • Message:消息,发送和消费的主体
  • Producer: 消息发送者
  • Consumer:消息接收者
  • Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列
  • Queue:消息队列,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息
  • Connection:对消息队列链接的抽象
  • Channel:与AMQP中概念相似,能够理解成共享一个Connection的多个轻量化链接
  • Transport:真实的MQ链接,区分底层消息队列的实现

对于不一样的Transport的支持:github

代码示例

先从官网示例代码开始:redis

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

基本上,各类角色都出场了。各类角色的使用都要从创建Connection开始。json

Connection

获取链接很简单:缓存

>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')

如今的链接其实并未真正创建,只有在须要使用的时候才真正创建链接并将链接缓存:app

@property
def connection(self):
    """The underlying connection object.
    Warning:
        This instance is transport specific, so do not
        depend on the interface of this object.
    """
    if not self._closed:
        if not self.connected:
            self.declared_entities.clear()
            self._default_channel = None
            self._connection = self._establish_connection()
            self._closed = False
        return self._connection

也能够主动链接:框架

>>> connection.connect()
def connect(self):
    """Establish connection to server immediately."""
    self._closed = False
    return self.connection

固然,链接底层是由各自使用的不一样的Transport创建的:异步

conn = self.transport.establish_connection() 

链接须要显式的关闭:socket

>>> connection.release()

因为Connection实现了上下文生成器:

def __enter__(self):
    return self

def __exit__(self, *args):
    self.release()

因此能够使用with语句,以避免忘记关闭链接:

with Connection() as connection:
    # work with connection

能够使用Connection直接创建ProcuderConsumer,其实就是调用了各自的建立类:

def Producer(self, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Producer` instance."""
    from .messaging import Producer
    return Producer(channel or self, *args, **kwargs)

def Consumer(self, queues=None, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Consumer` instance."""
    from .messaging import Consumer
    return Consumer(channel or self, queues, *args, **kwargs)

Producer

链接建立后,能够使用链接建立Producer

producer = conn.Producer(serializer='json')

也能够直接使用Channel建立:

with connection.channel() as channel:
    producer = Producer(channel, ...)

Producer实例初始化的时候会检查第一个channel参数:

self.revive(self.channel)
channel = self.channel = maybe_channel(channel)

这里会检查channel是否是Connection实例,是的话会将其替换为Connection实例的default_channel属性:

def maybe_channel(channel):
    """Get channel from object.
    Return the default channel if argument is a connection instance,
    otherwise just return the channel given.
    """
    if is_connection(channel):
        return channel.default_channel
    return channel

因此Producer仍是与Channel联系在一块儿的。

Producer发送消息:

producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                  exchange=media_exchange, routing_key='video',
                  declare=[video_queue])

pulish作的事情,主要是由Channel完成的:

def _publish(self, body, priority, content_type, content_encoding,
┆   ┆   ┆   ┆headers, properties, routing_key, mandatory,
┆   ┆   ┆   ┆immediate, exchange, declare):
┆   channel = self.channel
┆   message = channel.prepare_message(
┆   ┆   body, priority, content_type,
┆   ┆   content_encoding, headers, properties,
┆   )   
┆   if declare:
┆   ┆   maybe_declare = self.maybe_declare
┆   ┆   [maybe_declare(entity) for entity in declare]

┆   # handle autogenerated queue names for reply_to
┆   reply_to = properties.get('reply_to')
┆   if isinstance(reply_to, Queue):
┆   ┆   properties['reply_to'] = reply_to.name
┆   return channel.basic_publish(
┆   ┆   message,
┆   ┆   exchange=exchange, routing_key=routing_key,
┆   ┆   mandatory=mandatory, immediate=immediate,
┆   )

Channel组装消息prepare_message,而且发送消息basic_publish

Channel又是Transport建立的:

chan = self.transport.create_channel(self.connection)

Transport

当建立Connection时,须要传入hostname,相似于:

amqp://guest:guest@localhost:5672//

而后获取hostnamescheme,好比redis:

transport = transport or urlparse(hostname).scheme

以此来区分建立的Transport的类型。

建立过程:

self.transport_cls = transport

transport_cls = get_transport_cls(transport_cls)

def get_transport_cls(transport=None):
    """Get transport class by name.

    The transport string is the full path to a transport class, e.g.::

    ┆   "kombu.transport.pyamqp:Transport"

    If the name does not include `"."` (is not fully qualified),
    the alias table will be consulted.
    """
    if transport not in _transport_cache:
    ┆   _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]

transport = TRANSPORT_ALIASES[transport]

TRANSPORT_ALIASES = {
    ...

    'redis': 'kombu.transport.redis:Transport',
    
    ...
}

Redis为例,Transport类在/kombu/transport/redis.py文件,继承自/kombu/transport/virtual/base.py中的Transport类。

建立Channel:

channel = self.Channel(connection)

而后Channel组装消息prepare_message,而且发送消息basic_publish

Channel

Channel实例有几个属性关联着Consumer、Queue等,virtual.Channel

class Channel(AbstractChannel, base.StdChannel):
    def __init__(self, connection, **kwargs):
        self.connection = connection
        self._consumers = set()
        self._cycle = None 
        self._tag_to_queue = {} 
        self._active_queues = [] 
        ... 

其中,_consumers是相关联的消费者标签集合,_active_queues是相关联的Queue列表,_tag_to_queue则是消费者标签与Queue的映射:

self._tag_to_queue[consumer_tag] = queue
self._consumers.add(consumer_tag)
self._active_queues.append(queue)

Channel对于不一样的底层消息队列,也有不一样的实现,以Redis为例:

class Channel(virtual.Channel):
    """Redis Channel."""

继承自virtual.Channel

组装消息函数prepare_message:

def prepare_message(self, body, priority=None, content_type=None,
┆   ┆   ┆   ┆   ┆   content_encoding=None, headers=None, properties=None):
┆   """Prepare message data."""
┆   properties = properties or {}
┆   properties.setdefault('delivery_info', {})
┆   properties.setdefault('priority', priority or self.default_priority)

┆   return {'body': body,
┆   ┆   ┆   'content-encoding': content_encoding,
┆   ┆   ┆   'content-type': content_type,
┆   ┆   ┆   'headers': headers or {},
┆   ┆   ┆   'properties': properties or {}}

基本上是为消息添加各类属性。

发送消息basic_publish方法是调用_put方法:

def _put(self, queue, message, **kwargs):
┆   """Deliver message."""
┆   pri = self._get_message_priority(message, reverse=False)

┆   with self.conn_or_acquire() as client:
┆   ┆   client.lpush(self._q_for_pri(queue, pri), dumps(message))

client是一个redis.StrictRedis链接:

def _create_client(self, asynchronous=False):
┆   if asynchronous:
┆   ┆   return self.Client(connection_pool=self.async_pool)
┆   return self.Client(connection_pool=self.pool)

self.Client = self._get_client()

def _get_client(self):
┆   if redis.VERSION < (3, 2, 0):
┆   ┆   raise VersionMismatch(
┆   ┆   ┆   'Redis transport requires redis-py versions 3.2.0 or later. '
┆   ┆   ┆   'You have {0.__version__}'.format(redis))
┆   return redis.StrictRedis

Redis将消息置于某个列表(lpush)中。还会根据是否异步的选项选择不一样的connection_pool

Consumer

如今消息已经被放置与队列中,那么消息又被如何使用呢?

Consumer初始化须要声明Channel和要消费的队列列表以及处理消息的回调函数列表:

with Consumer(connection, queues, callbacks=[process_media], accept=['json']):
    connection.drain_events(timeout=1)

Consumer实例被当作上下文管理器使用时,会调用consume方法:

def __enter__(self):
    self.consume()
    return self

consume方法代码:

def consume(self, no_ack=None):
    """Start consuming messages.

    Can be called multiple times, but note that while it
    will consume from new queues added since the last call,
    it will not cancel consuming from removed queues (
    use :meth:`cancel_by_queue`).

    Arguments:
        no_ack (bool): See :attr:`no_ack`.
    """
    queues = list(values(self._queues))
    if queues:
        no_ack = self.no_ack if no_ack is None else no_ack

        H, T = queues[:-1], queues[-1]
        for queue in H:
            self._basic_consume(queue, no_ack=no_ack, nowait=True)
        self._basic_consume(T, no_ack=no_ack, nowait=False)

使用_basic_consume方法处理相关的队列列表中的每一项,其中处理最后一个Queue时设置标志nowait=False

_basic_consume方法代码:

def _basic_consume(self, queue, consumer_tag=None,
                   no_ack=no_ack, nowait=True):
    tag = self._active_tags.get(queue.name)
    if tag is None:
        tag = self._add_tag(queue, consumer_tag)
        queue.consume(tag, self._receive_callback,
                      no_ack=no_ack, nowait=nowait)
    return tag

是将消费者标签以及回调函数传给Queueconsume方法。

Queueconsume方法代码:

def consume(self, consumer_tag='', callback=None, 
            no_ack=None, nowait=False):     
    """Start a queue consumer.      

    Consumers last as long as the channel they were created on, or
    until the client cancels them.

    Arguments:             
        consumer_tag (str): Unique identifier for the consumer.
            The consumer tag is local to a connection, so two clients
            can use the same consumer tags. If this field is empty
            the server will generate a unique tag.

        no_ack (bool): If enabled the broker will automatically
            ack messages.

        nowait (bool): Do not wait for a reply.

        callback (Callable): callback called for each delivered message.
    """
    if no_ack is None:
        no_ack = self.no_ack            
    return self.channel.basic_consume(
        queue=self.name,
        no_ack=no_ack,
        consumer_tag=consumer_tag or '',
        callback=callback,
        nowait=nowait,
        arguments=self.consumer_arguments)

又回到了ChannelChannelbasic_consume代码:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
    """Consume from `queue`."""     
    self._tag_to_queue[consumer_tag] = queue
    self._active_queues.append(queue)

    def _callback(raw_message):         
        message = self.Message(raw_message, channel=self)
        if not no_ack:
            self.qos.append(message, message.delivery_tag)
        return callback(message)

    self.connection._callbacks[queue] = _callback
    self._consumers.add(consumer_tag)

    self._reset_cycle()   

ChannelConsumer标签,Consumer要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。另外,还经过Transport将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。

真正的调用是下面这行代码实现的:

connection.drain_events(timeout=1)

如今来到Transportdrain_events方法:

def drain_events(self, connection, timeout=None):
    time_start = monotonic()
    get = self.cycle.get
    polling_interval = self.polling_interval
    if timeout and polling_interval and polling_interval > timeout:
        polling_interval = timeout
    while 1:
        try: 
            get(self._deliver, timeout=timeout)
        except Empty:
            if timeout is not None and monotonic() - time_start >= timeout:
                raise socket.timeout()
            if polling_interval is not None:
                sleep(polling_interval)
        else:
            break

看上去是在无限执行get(self._deliver, timeout=timeout)

getself.cycle的一个方法,cycle是一个FairCycle实例:

self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)

@python_2_unicode_compatible
class FairCycle(object):
    """Cycle between resources.

    Consume from a set of resources, where each resource gets
    an equal chance to be consumed from.

    Arguments: 
        fun (Callable): Callback to call.
        resources (Sequence[Any]): List of resources.
        predicate (type): Exception predicate.
    """

    def __init__(self, fun, resources, predicate=Exception):
        self.fun = fun
        self.resources = resources
        self.predicate = predicate
        self.pos = 0

    def _next(self):
        while 1:
            try:
                resource = self.resources[self.pos]
                self.pos += 1
                return resource
            except IndexError:
                self.pos = 0
                if not self.resources:
                    raise self.predicate()

    def get(self, callback, **kwargs):
        """Get from next resource."""
        for tried in count(0):  # for infinity
            resource = self._next()
            try:
                return self.fun(resource, callback, **kwargs)
            except self.predicate:
                # reraise when retries exchausted.
                if tried >= len(self.resources) - 1:
                    raise

FairCycle接受两个参数,fun是要执行的函数fun,而resources做为一个迭代器,每次提供一个item供fun调用。

此处的fun_drain_channelresourceschannels:

def _drain_channel(self, channel, callback, timeout=None):
    return channel.drain_events(callback=callback, timeout=timeout)

Transport相关联的每个channel都要执行drain_events

Channeldrain_events代码:

def drain_events(self, timeout=None, callback=None):
    callback = callback or self.connection._deliver
    if self._consumers and self.qos.can_consume():
        if hasattr(self, '_get_many'):
            return self._get_many(self._active_queues, timeout=timeout)
        return self._poll(self.cycle, callback, timeout=timeout)
    raise Empty()

_poll代码:

def _poll(self, cycle, callback, timeout=None):
    """Poll a list of queues for available messages."""
    return cycle.get(callback)

又回到了FairCycleChannelFairCycle实例:

def _reset_cycle(self):
    self._cycle = FairCycle(
        self._get_and_deliver, self._active_queues, Empty)

_get_and_deliver方法从队列中取出消息,而后调用Transport传递过来的_deliver方法:

def _get_and_deliver(self, queue, callback):
    message = self._get(queue)
    callback(message, queue)

_deliver代码:

def _deliver(self, message, queue):
    if not queue:
        raise KeyError(
            'Received message without destination queue: {0}'.format(
                message))
    try:
        callback = self._callbacks[queue]
    except KeyError:
        logger.warning(W_NO_CONSUMERS, queue)
        self._reject_inbound_message(message)
    else:
        callback(message)

作的事情是根据队列取出注册到此队列的回调函数列表,而后对消息执行列表中的全部回调函数。

回顾

可见,Kombu中ChannelTransport很是重要,Channel记录了队列列表、消费者列表以及二者的映射关系,而Transport记录了队列与回调函数的映射关系。Kombu对全部须要监听的队列_active_queues都查询一遍,直到查询完毕或者遇到一个能够使用的Queue,而后就获取消息,回调此队列对应的callback。

相关文章
相关标签/搜索