celery内置了kombu库,看了一下kombu的源码,从官网最简单的一个例子来分析---消息发布,源码以下:python
from __future__ import absolute_import, unicode_literals import datetime from kombu import Connection with Connection('redis://localhost:6379/0') as conn: simple_queue = conn.SimpleQueue('simple_queue') message = 'helloworld, sent at {0}'.format(datetime.datetime.today()) simple_queue.put(message) print('Sent: {0}'.format(message)) simple_queue.close()
运行以前开启redis服务。这真是简单到不能到简单的例子-.-git
一步步分析画出以下类图:github
大概十七八个类。流程省略几百万个字。redis
记一下关键步骤:spa
一、建立生产者 messaging.Producer 时不会操做redis。设计
二、建立消息者 messaging.Consumer 时会建立exchange,及其对应的 routing_key、patter、queue(队列名称),具体格式像这样:3d
_kombu.binding.exchange_name => (routing_key\x06\x16pattern\x06\x16queue_name)
这是一个sadd操做,key是 _kombu.binding.exchange_name,前面是固定的,exchange_name是变化的;
value是 routing_key、pattern、和绑定的队列名。\x06\x16是分隔符。这能够从redis里面看出:
生产者在publish消息时,调用的是:
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
能够看到,生产者只须要知道exchange、routing_key就能够发消息到队列。发送到redis的消息内容以下:code
这是个lpush命令,key是队列名、value是消息内容连同元数据:orm
lpush queue_name => [message, ... ]
生产者producer发布消息到此结束。blog
其中kombu对redis库作了一下简单的封装,里面有个AsyncRedis类,不过貌似没什么卵用。
借鉴kombu里对redis封装的设计,我封装了一下redis,使用简单,绝对无公害。地址在这:Python RedisChannel