celery的使用

介绍

Celery 是一个基于python开发的分布式异步消息任务队列,经过它能够轻松的实现任务的异步处理, 若是你的业务场景中须要用到异步任务,就能够考虑使用celery。
异步任务介绍
在写项目过程当中常常会遇到一些耗时的任务, 好比:发送邮件、发送短信等等~。这些操做若是都同步执行耗时长对用户体验不友好,在这种状况下就能够把任务放在后台异步执行
celery就是用于处理异步任务的框架,celery能完成的功能远不止异步任务,还有一个很经常使用的功能定时任务.python

架构
 
image.png
Celery包含以下组件:

Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期须要执行的任务发送给任务队列。
Celery Worker:执行任务的消费者,一般会在多台服务器运行多个消费者来提升执行效率。
Broker:消息代理,或者叫做消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(一般是消息队列或者数据库)。
Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。redis

特色
  • 简单:一单熟悉了celery的工做流程后,配置和使用仍是比较简单的
  • 高可用:当任务执行失败或执行过程当中发生链接中断,celery 会自动尝试从新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件均可以被扩展及自定制
多任务执行方式

后台多任务方式:
启动:celery multi start w1 -A proj -l info
重启:celery multi restart w1 -A proj -l info
中止:celery multi stop w1 -A proj -l info
任务执行完毕后才退出:celery multi stopwait w1 -A proj -l info数据库

定时任务

celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beatdjango

方式一:函数方式
from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) 
方式二:配置文件形式
app.conf.beat_schedule = {
    'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC' 

任务添加好了,须要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务须要执行了,就发起一个任务调用消息,交给celery worker去执行
启动任务调度器celery beat:celery -A periodic_task beat
此时还差一步,就是还须要启动一个worker,负责执行celery beat发起的任务
启动celery worker来执行任务:celery -A periodic_task workerbash

经常使用的定时任务配置
Example Meaning
crontab() 默认是每分钟
crontab(minute=0, hour=0) 天天0点执行
crontab(minute=0, hour='*/3') 每隔3个小时执行
crontab(minute=0,hour='0,3,6') 0点、3点、6点执行
crontab(minute='*/15') 每隔15分钟
crontab(day_of_week='sunday') 每逢周日的每一分钟执行
crontab(minute='',hour='',day_of_week='sun') 与上一个意义一致
crontab(minute='*/10',hour='3,17,22',day_of_week='fri') 在周五3-4点、17-18点、22-23点之间每隔10分钟执行
crontab(minute=0,hour='/2,/3') 能被2,3整除的小时执行
crontab(minute=0, hour='*/5') 能被5整除的小时执行
crontab(minute=0, hour='*/3,8-17') 能被3整除或者8-17之间的小时执行
crontab(0, 0,day_of_month='2') 每月的次日
crontab(0, 0,day_of_month='2-30/3') 偶数天执行
crontab(0, 0,day_of_month='1-7,15-21') 每月的第一周和第三周
crontab(0, 0,day_of_month='11',month_of_year='5') 每一年的5月11日执行
crontab(0, 0,month_of_year='*/3') 在每一个季度的第一个月执行

上面能知足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务服务器

最佳实践之与django结合

django 能够轻松跟celery结合实现异步任务,只需简单配置便可
项目目录架构

- proj /
   - proj / __init__ . py
   - proj / settings . py
   - proj / urls . py
- manage . py

proj/proj/celery.pyapp

from __future__ import absolute_import, unicode_literals import os from celery import Celery # 为celery程序设置默认的Django设置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # 从配置文件中引入celery相关参数,每一个参数使用CELERY_开头 app.config_from_object('django.conf:settings', namespace='CELERY') # 从每一个app中的tasks.py文件加载异步任务 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) 

proj/proj/__init__.py框架

from __future__ import absolute_import, unicode_literals # 确保celery文件必定被引用 from .celery import app as celery_app __all__ = ['celery_app'] 

proj/xxx/tasks.py异步

from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers) 

proj/xxx/views.py

from django.shortcuts import render,HttpResponse from bernard import tasks def task_test(request): res = tasks.add.delay(228,24) print("start running task") print("async task res",res.get() ) return HttpResponse('res %s'%res.get()) 
在django中使用计划任务功能

安装:pip install django-celery-beat
加载:在settings.py中INSTALLED_APPS加入django_celery_beat
生成相关数据库:python manage.py migrate
开启定时任务:celery -A proj beat -l info -S django
结果:

 
admin中的3张表

 
image.png

此时启动你的celery beat 和worker,会发现每隔2分钟,beat会发起一个任务消息让worker执行scp_task任务
注意,经测试,每添加或修改一个任务,celery beat都须要重启一次,要否则新的配置不会被celery beat进程读到

 

celery flower

(1).查看任务历史,任务具体参数,开始时间等信息。
(2).提供图表和统计数据。
(3).实现全面的远程控制功能, 包括但不限于 撤销/终止任务, 关闭重启 worker, 查看正在运行任务。
(4).提供一个 HTTP API , 方便集成。
终端执行:celery flower --broker=redis://localhost:6379/6

相关文章
相关标签/搜索