1.1 Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操做和维护分布式系统所需的工具(它自己不是一个任务队列, 它是 任务队列管理的工具, 它提供的接口能够帮助咱们实现分布式任务队列)。python
1.2 Celery 专一于实时任务处理,支持任务调度(跟rabbitMQ可实现多种exchange。)redis
说白了,它是一个分布式队列的管理工具,咱们能够用 Celery 提供的接口快速实现并管理一个分布式的任务队列。mongodb
1.3 Celery 架构 shell
消息中间件(message broker)(邮箱, 邮局): 自己不提供消息服务,能够和第三方消息中间件集成,经常使用的有 redis mongodb rabbitMQdjango
任务执行单元(worker)(寄件人): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中bash
任务执行结果存储(task result store)(收件人):用来存储Worker执行的任务的结果,Celery支持以不一样方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等服务器
1.4 任务队列和消息队列架构
任务队列是一种在线或机器分发任务的机制并发
消息队列输入是工做的一个单元, 能够认为是一个任务,独立的职程(Worker)进程持续监视队列中是否有须要处理的新任务。app
图解
import time
from celery import Celery
app = Celery('tasks', broker='redis:////127.0.0.1:6379/6', backend='redis:////127.0.0.1:6379/7')
@app.task
def add(x, y):
time.sleep(10)
return x + y
复制代码
ps: tasks为任务名称 设置reids为中间件
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tasks import add, app
from celery.result import AsyncResult
import time
# 当即告知celery去执行add任务,并传入两个参数
result = add.delay(4, 4)
print(result.id)
async = AsyncResult(id=result.id, app=app)
time.sleep(3)
if async.successful():
result = async.get()
print(result, "执行成功")
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
复制代码
A参数指定celery对象的位置,该app.celery_tasks.celery指的 是app包下面的celery_tasks.py模块的celery实例,注意必定是初始化后的实例,
Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不一样的任务时能够独立;若是不设会接收全部的队列的任务;
l参数指定worker的日志级别;
执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 该键值对的失效时间为24小时
{
"body": "gAJ9cQAoWAQAAAB0YXNrcQFYCQAAAHRhc2tzLmFkZHECWAIAAABpZHEDWCQAAABjNDMwMzZkMi03Yzc3LTQ0MDUtOTYwNC1iZDc3ZTcyNzNlN2FxBFgEAAAAYXJnc3EFSwRLBIZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==",
"content-encoding": "binary",
"content-type": "application/x-python-serialize",
"headers": {},
"properties": {
"reply_to": "caa78c3a-618a-31f0-84a9-b79db708af02",
"correlation_id": "c43036d2-7c77-4405-9604-bd77e7273e7a",
"delivery_mode": 2,
"delivery_info": {
"priority": 0,
"exchange": "celery",
"routing_key": "celery"
},
"body_encoding": "base64",
"delivery_tag": "e7e288b5-ecbb-4ec6-912c-f42eb92dbd72"
}
}
复制代码
CELERY_DEFAULT_QUEUE:默认队列
BROKER_URL : 代理人的网址
CELERY_RESULT_BACKEND:结果存储地址
CELERY_TASK_SERIALIZER:任务序列化方式
CELERY_RESULT_SERIALIZER:任务执行结果序列化方式
CELERY_TASK_RESULT_EXPIRES:任务过时时间
CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表;
复制代码
r = func.delay(...)
r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 不然返回 False.
r.wait() # 等待任务完成, 返回任务执行结果,不多使用;
r.get(timeout=1) # 获取任务执行结果,能够设置等待时间
r.result # 任务执行结果.
r.state # PENDING, START, SUCCESS,任务当前的状态
r.status # PENDING, START, SUCCESS,任务当前的状态
r.successful # 任务成功返回true
r.traceback # 若是任务抛出了一个异常,你也能够获取原始的回溯信息
复制代码
@celery.task()
def func():
# do something
pass
复制代码
class MyTask(celery.Task):
# 任务失败时执行
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 任务成功时执行
def on_success(self, retval, task_id, args, kwargs):
pass
# 任务重试时执行
def on_retry(self, exc, task_id, args, kwargs, einfo):
pass
复制代码
参数
# 注意,celery4版本后,CELERY_BROKER_URL改成BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任务过时时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 20
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["msgpack"]
# 任务发送完成是否须要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True
# 压缩方案选择,能够是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5 # 在5s内完成任务,不然执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4
# celery worker 每次去rabbitmq预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每一个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 设置默认的队列名称,若是一个消息不符合其余的队列就会放在默认队列里面,若是什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"
# 设置详细的队列
CELERY_QUEUES = {
"default": { # 这是上面指定的默认队列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 设置扇形交换机
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
},
}
复制代码
# config.py
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'ptask': {
'task': 'tasks.period_task',
'schedule': timedelta(seconds=5),
},
}
# 添加定时任务
@app.task(bind=True)
def period_task(self):
print 'period task done: {0}'.format(self.request.id)
复制代码
PS:时间若是涉及到datatime最好设置为UTC时间
celery -A task beat
复制代码
链式任务就是异步或者定时执行的任务由多个子任务执行完成
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
复制代码
当咱们执行的任务须要根据特定的须要进行分类时,咱们能够对任务建立多个队列进行, 每个队列交换方式能够指定,须要注意的是:redis只能提供 direct exchange 方式, 也是默认指定的方式,因此咱们把中间人换成了rabbitmq。
首先咱们来了解一下交换模式有哪些?
Direct Exchange 模式
这种模式是rabbitmq(redis)自带的一种模式,因此咱们在实际使用过程当中只要指定routing_key就能够了, 或者是指定队列名称便可。
ps: 若是咱们指定的队列名称不在配置里面,那咱们建立的这条消息任务会被自动废除,因此须要检查下配置里的队列是否正确,由于rabbitmq只具有存储队列的能力,不能存储消息信息。
Fanout Exchange 模式
Topic Exchange 模式
任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上
这种模式较为复杂,简单来讲,就是每一个队列都有其关心的主题,全部的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到全部关注主题能与RouteKey模糊匹配的队列。
这种模式须要RouteKey,也许要提早绑定Exchange与Queue。
在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。
一样,若是Exchange没有发现可以与RouteKey匹配的Queue,则会抛弃此消息。
首先要安装rabitmq而且启动 rabbitmq-server
建立rabbitmq_config.py 文件, 而且把以前在tasks.py中引用的配置修改成rabbitmq_config,代码以下
#coding:utf-8
from celery.schedules import crontab
import sys
import os
sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"
复制代码
default_exchange = Exchange('dedfault', type='direct')
# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')
# 定义一个image交换机,类型是fanout交换机
image_exchange = Exchange('media', type='direct')
# 建立三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
# 定义默认队列和默认的交换机routing_key
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
复制代码
# 视频压缩
@app.task
def video_compress(video_name):
time.sleep(10)
print('Compressing the:', video_name)
return 'success'
@app.task
def video_upload(video_name):
time.sleep(5)
print( u'正在上传视频')
return 'success'
# 压缩照片
@app.task
def image_compress(image_name):
time.sleep(10)
print('Compressing the:', image_name)
return 'success'
# 其余任务
@app.task
def other(str):
time.sleep(10)
print ('Do other things')
return 'success'
复制代码
CELERY_ROUTES = ({'tasks.image_compress': {
'queue': 'images',
'routing_key': 'media.image'
}},{'tasks.video_upload': {
'queue': 'videos',
'routing_key': 'media.video'
}},{'tasks.video_compress': {
'queue': 'videos',
'routing_key': 'media.video'
}}, )
复制代码
在启动worker的时候能够分两种启动方式
第一种: 指定Queue
第二种 : 不指定(所有执行)
ps 为了更好的看到咱们添加的队列,还有相应的交换模式,启动所有的队列
启动worker
[queue]中包含了建立的队列,其余参数本文前面能够对照 [tasks]中显示了咱们全部的任务
---- **** -----
--- * *** * -- Darwin-18.2.0-x86_64-i386-64bit 2018-12-28 15:38:00
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x104e78d68
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=dedfault(direct) key=default
.> images exchange=media(direct) key=media.image
.> others exchange=other(fanout) key=other.others
.> videos exchange=media(direct) key=media.video
[tasks]
. tasks.add
. tasks.dr
. tasks.image_compress
. tasks.other
. tasks.period_task
. tasks.task
. tasks.video_compress
. tasks.video_upload
[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
复制代码
执行结果(能够在rabbimq后台管理中看相关执行结果):
[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-12-28 15:38:00,933: INFO/MainProcess] mingle: searching for neighbors
[2018-12-28 15:38:02,013: INFO/MainProcess] mingle: all alone
[2018-12-28 15:38:02,091: INFO/MainProcess] celery@zhanlingjiedeMacBook-Pro.local ready.
[2018-12-28 15:38:42,386: INFO/MainProcess] Received task: tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960]
[2018-12-28 15:38:42,429: INFO/ForkPoolWorker-3] Task tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960] succeeded in 0.040455893002217636s: 5
[2018-12-28 15:38:46,397: INFO/MainProcess] Received task: tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9]
[2018-12-28 15:38:46,410: INFO/MainProcess] Received task: tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed]
[2018-12-28 15:38:56,401: WARNING/ForkPoolWorker-4] Compressing the:
[2018-12-28 15:38:56,402: WARNING/ForkPoolWorker-4] 这是我上传的图片
[2018-12-28 15:38:56,412: WARNING/ForkPoolWorker-3] Do other things
[2018-12-28 15:38:56,447: INFO/ForkPoolWorker-3] Task tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed] succeeded in 10.036200570997607s: 'success'
[2018-12-28 15:38:56,461: INFO/ForkPoolWorker-4] Task tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9] succeeded in 10.061314186998061s: 'success'
复制代码