hello, 小伙伴们, 很久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,但愿能给入坑的你带来些许帮助.python
首先是对celery的介绍,Celery实际上是一个专一于实时处理和调度任务的分布式任务队列,同时提供操做和维护分布式系统所须要的所有数据, 所以能够用它提供的接口快速实现并管理一个分布式的任务队列,它自己不是任务队列,它是封装了操做常见任务队列的各类操做, 可使用它快速进行任务队列的使用与管理.在Python中的组成部分是 1.用户任务 app 2.管道 broker 用于存储任务 官方推荐的是 redis rabbitMQ / backend 用于存储任务执行结果的 3, 员工 worker 大体流程入下:linux
最左边的是用户, 用户发起1个请求给服务器, 要服务器执行10个任务,将这10个任务分给10个调度器,即开启10个线程进行任务处理,worker会一直监听调度器是否有任务, 一旦发现有新的任务, 就会当即执行新任务,一旦执行完就会返回给调度器, 即backend, backend会将请求发送给服务器, 服务器将结果返回给用户, 表现的结果就是,这10个任务同时完成,同时返回,,这就是Celery的整个工做流程, 其中的角色分别为,任务(app_work), 调度器(broker + backend), 将任务缓存的部分, 即将全部任务暂时存在的地方,至关于生产者, 消费者(worker 能够指定数量, 即在建立worker命令的时候能够指定数量), 在worker拿到任务后,人就控制不了了, 除非把worker杀死, 否则确定会执行完.redis
也即 任务来了之后, 调度器(broker)去缓存任务, worker去执行任务, 完成后返回backend,接着返回,windows
还有就是关于定时任务和周期任务在linux上为何不用自身所带着的去作,是由于linux周期定时任务是不可控的, 很差管理, 返回值保存也是个麻烦事, 而celery只要开启着调度器, 就能够随时把人物结果获取到,即便用celery控制起来是很是方便的.缓存
接下来就是实例代码:服务器
from celery import Celery import time # 建立一个Celery实例, 就是用户的应用app 第一个参数是任务名称, 能够随意起 后面的就是配置的broker和backend diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379") # 接下来是为应用建立任务 ab @diaoduqi.task def ab(a,b): time.sleep(15) return a+b
from worker import ab # 将任务交给Celery的Worker执行 res = ab.delay(2,4) #返回任务ID print(res.id)
from celery.result import AsyncResult from worker import diaoduqi # 异步获取任务返回值 async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi) # 判断异步任务是否执行成功 if async_task.successful(): #获取异步任务的返回值 result = async_task.get() print(result) else: print("任务还未执行完成")
为了方便,如今直接将三个文件表明的部分命名在文件名称中.首先是启动workers.py app
启动方式是依据系统的不一样来启动的, 对于linux下 celery worker -A workers -l INFO 也能够指定开启的worker数量 即在后面添加的参数是 -c 5 表示指定5个worker 理论上指定的worker是无上限的,异步
在windows下须要安装一个eventlet模块进行运行, 否则不会运行成功 pip install eventlet 能够开启线程 不指定数量是默认6个worker, 理论上worker的数量能够开启无限个,可是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 开启5个worker 执行async
该命令后 处于就绪状态, 须要发布任务, 即brokers.py进行任务发布, 方法是使用delay的方式执行异步任务, 返回了一个任务id, 接着去backends.py中取这个任务id, 去查询任务是否完成,断定条件即任务.successful 判断是否执行完, 上面就是celery异步执行任务的用法与解释分布式
接下来就是celery在项目中的应用
在实际项目中应用celery是有必定规则的, 即目录结构应该以下.
结构说明 首先是建立一个CeleryTask的包,接着是在里面建立一个celery.py,必须是这个文件 关于重名的问题, 找寻模块的顺序是先从当前目录中去寻找, 根本找不到,接着是从内置模块中去找, 根本就找不到写的这个celery这个文件,
from celery import Celery DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])
import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"
import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"
from CeleryTask.TaskOne import one1 as one # one.delay(10,10) # two.delay(20,20) # 定时任务咱们不在使用delay这个方法了,delay是当即交给task 去执行 # 如今咱们使用apply_async定时执行 # 首先咱们要先给task一个执行任务的时间 import datetime, time # 获取当前时间 此时间为东八区时间 ctime = time.time() # 将当前的东八区时间改成 UTC时间 注意这里必定是UTC时间,没有其余说法 utc_time = datetime.datetime.utcfromtimestamp(ctime) # 为当前时间增长 10 秒 add_time = datetime.timedelta(seconds=10) action_time = utc_time + add_time # action_time 就是当前时间将来10秒以后的时间 # 如今咱们使用apply_async定时执行 res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) print(res.id) # 这样本来延迟5秒执行的One函数如今就要在10秒钟之后执行了
接着是在命令行cd到与CeleryTask同级目录下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 这样 就开启了worker 接着去 发布任务, 在定时任务中再也不使用delay这个方法了,
delay是当即交给ttask去执行, 在这里使用 apply_async定时执行 指的是调度的时候去定时执行
须要设置的是UTC时间, 以及定时的时间(多长时间之后执行) 以后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令开启worker, 以后运行 getR.py文件发布任务, 能够看到在定义的时间之后执行该任务
周期任务
周期任务 指的是在指定时间去执行任务 须要导入的一个模块有 crontab
文件结构以下
结构同定时任务差很少,只不过须要变更一下文件内容 GetR文件已经不须要了,能够删除.
from celery import Celery from celery.schedules import crontab DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"]) # 我要要对beat任务生产作一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10) DDQ.conf.beat_schedule = { "each10s_task": { "task": "CeleryTask.TaskOne.one1", "schedule": 10, # 每10秒钟执行一次 "args": (10, 10) }, "each1m_task": { "task": "CeleryTask.TaskOne.one2", "schedule": crontab(minute=1) # 每1分钟执行一次 也能够替换成 60 即 "schedule": 60 } }
import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"
import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"
以上配置完成之后,这时候就不能直接建立worker了,由于要执行周期任务,须要首先有一个任务的生产方, 即 celery beat -A CeleryTask, 用来产生建立者, 接着是建立worker worker的建立命令仍是原来的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 建立完worker以后, 每10秒就会由beat建立一个任务给 worker去执行.至此, celery建立异步任务, 周期任务,定时任务完毕, 伙伴们本身拿去测试吧.