最近看到celery文档task部分,作一下小结
python
实际处理时,咱们能够使用一个相似于logging的模块生成日志。数据库
对于某些任务,你能够设置当知足某些条件时,重试任务、拒绝任务或忽略任务json
在定义task时,@app.task(bind=True)中的bind参数能够让你在task中访问请求中的内容,好比id,group之类的信息app
@app.task(AGRS=VALUE),ARGS有好几个参数能够设置,好比name,有些和全局设置(CELERY_xxx_xxx之类的)是同样的配置内容异步
能够自定义任务状态(默认有pending,started,success,failure,retry,revoked)工具
当你使用pickle做为序列化工具时,你应该定义那些能够被pickle的异常(我用json,直接忽略)fetch
实例化。你能够继承Task类定义新类,定义的__init__方法只会被调用一次,此后将持续存在。当你的task以此新类为基类,后面对此task的调用中,__init__的做用还会存在。(用途:自定义类中维持一个对数据库的链接,task能够不用每次都建立链接,而是对那个存在的属性进行操做)。url
e.g:日志
from celery import Task class DatabaseTask(Task): abstract = True _db = None @property def db(self): if self._db is None: self._db = Database.connect() return self._db
@app.task(base=DatabaseTask) def process_rows(): for row in process_rows.db.table.all(): …
7.定义新类时,设置为抽象类,能够做为task的基类。其中又有四种handle方法(after_return,on_failure,on_retry,on_success)code
e.g:
from celery import Task class DebugTask(Task): abstract = True def after_return(self, *args, **kwargs): print('Task returned: {0!r}'.format(self.request) @app.task(base=DebugTask) def add(x, y): return x + y
8.最佳实践:
*忽略不须要的结果,或者设置CELERY_TASK_RESULT_EXPIRES
*若不须要,关闭rate limits
*避免写出同步的task(阻塞),同时执行的同步的task,前面的会令后面的低效,应该写成异步的方法或使用callback
e.g:
#Bad: @app.task def update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info) @app.task def fetch_page(url): return myhttplib.get(url) @app.task def parse_page(url, page): return myparser.parse_document(page) @app.task def store_page_info(url, info): return PageInfo.objects.create(url, info) #Good: def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s() | parse_page.s() | store_page_info.s(url) chain() @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)
*使用细粒度的任务,而不是又长又臭的long task
*尽可能把须要操做的数据放在本地,或使worker靠近数据所在,下降因IO延迟的影响
暂时如此,之后有须要再补