由于最近项目需求中须要提供对异步执行任务终止的功能,因此在寻找中止celery task任务的方法。这种需求之前没有碰到过,因此,只能求助于百度和google,可是找遍了资料,都没找到相关的能中止celery task任务的方法(网上找到的一个方法实测不能用,多是celery版本的缘由,个人项目目前使用的是celery 4.0.2)app
因为网上找不到解决办法,因而只能本身想办法了。
想到celery 管理工具flower里面好像有中止celery task的功能,因而去找flower的源码,找到接口的源码以下:异步
logger.info("Revoking task '%s'", taskid) terminate = self.get_argument('terminate', default=False, type=bool) self.capp.control.revoke(taskid, terminate=terminate) self.write(dict(message="Revoked '%s'" % taskid))
核心代码是self.capp.control.revoke
想到去celery里面找寻revoke
函数,发现有两处比较可疑,第一个是celery.worker.control.revoke
,第二个是celery.app.control.Control.revoke
,直觉来看,应该是第二个方法,可是第二个方法是在一个类里面的,要调用这个方法首先须要获取到celery app的实例,后来去celery 配置里面找,发如今__init__.py文件里面有__all__ = ['celery_app']
这么一句,因而找到突破点了,引用这个包就能获取到celery_app了。函数
from test.ceyery_proj import celery_app celery_app.control.revoke(task_id, terminate=True)
经过这个方法就能终止正在执行的task,至于task_id在执行任务的时候返回了,我将这个id存储在数据库中,这样就能够被拿来控制task的执行了。工具
写这篇文档的目的主要是帮助小伙伴们不要再踩这个坑了,也为celery提供一点文档补充吧。google