celery最佳实践

做为一个Celery使用重度用户,看到Celery Best Practices这篇文章,不禁得菊花一紧。干脆翻译出来,同时也会加入咱们项目中celery的实战经验。html

至于Celery为什么物,看这里Celerymysql

一般在使用Django的时候,你可能须要执行一些长时间的后台任务,没准你可能须要使用一些能排序的任务队列,那么Celery将会是一个很是好的选择。web

当把Celery做为一个任务队列用于不少项目中后,做者积累了一些最佳实践方式,譬如如何用合适的方式使用Celery,以及一些Celery提供的可是还未充分使用的特性。redis

1,不要使用数据库做为你的AMQP Broker

数据库并非天生设计成能用于AMQP broker的,在生产环境下,它颇有可能在某时候当机(PS,当掉这点我以为任何系统都不能保证不当吧!!!)。sql

做者猜测为啥不少人使用数据库做为broker主要是由于他们已经有一个数据库用来给web app提供数据存储了,因而干脆直接拿来使用,设置成Celery的broker是很容易的,而且不须要再安装其余组件(譬如RabbitMQ)。docker

假设有以下场景:你有4个后端workers去获取并处理放入到数据库里面的任务,这意味着你有4个进程为了获取最新任务,须要频繁地去轮询数据库,没准每一个worker同时还有多个本身的并发线程在干这事情。数据库

某一天,你发现由于太多的任务产生,4个worker不够用了,处理任务的速度已经大大落后于生产任务的速度,因而你不停去增长worker的数量。忽然,你的数据库由于大量进程轮询任务而变得响应缓慢,磁盘IO一直处于高峰值状态,你的web应用也开始受到影响。这一切,都由于workers在不停地对数据库进行DDOS。后端

而当你使用一个合适的AMQP(譬如RabbitMQ)的时候,这一切都不会发生,以RabbitMQ为例,首先,它将任务队列放到内存里面,你不须要去访问硬盘。其次,consumers(也就是上面的worker)并不须要频繁地去轮询由于RabbitMQ能将新的任务推送给consumers。固然,若是RabbitMQ真出现问题了,至少也不会影响到你的web应用。网络

这也就是做者说的不用数据库做为broker的缘由,并且不少地方都提供了编译好的RabbitMQ镜像,你都能直接使用,譬如这些并发

对于这点,我是深表赞同的。咱们系统大量使用Celery处理异步任务,大概平均一天几百万的异步任务,之前咱们使用的mysql,而后总会出现任务处理延时太严重的问题,即便增长了worker也无论用。因而咱们使用了redis,性能提高了不少。至于为啥使用mysql很慢,咱们没去深究,没准也还真出现了DDOS的问题。

2,使用更多的queue(不要只用默认的)

Celery很是容易设置,一般它会使用默认的queue用来存听任务(除非你显示指定其余queue)。一般写法以下:

@app.task()
def my_taskA(a, b, c):
    print("doing something here...")

@app.task()
def my_taskB(x, y):
    print("doing something here...")

这两个任务都会在同一个queue里面执行,这样写其实颇有吸引力的,由于你只须要使用一个decorator就能实现一个异步任务。做者关心的是taskA和taskB没准是彻底两个不一样的东西,或者一个可能比另外一个更加剧要,那么为何要把它们放到一个篮子里面呢?(鸡蛋都不能放到一个篮子里面,是吧!)没准taskB其实不怎么重要,可是量太多,以致于重要的taskA反而不能快速地被worker进行处理。增长workers也解决不了这个问题,由于taskA和taskB仍然在一个queue里面执行。

3,使用具备优先级的workers

为了解决2里面出现的问题,咱们须要让taskA在一个队列Q1,而taskB在另外一个队列Q2执行。同时指定x workers去处理队列Q1的任务,而后使用其它的workers去处理队列Q2的任务。使用这种方式,taskB可以得到足够的workers去处理,同时一些优先级workers也能很好地处理taskA而不须要进行长时间的等待。

首先手动定义queue

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
    Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)

而后定义routes用来决定不一样的任务去哪个queue

CELERY_ROUTES = {
    'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
    'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}

最后再为每一个task启动不一样的workerscelery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B

在咱们项目中,会涉及到大量文件转换问题,有大量小于1mb的文件转换,同时也有少许将近20mb的文件转换,小文件转换的优先级是最高的,同时不用占用不少时间,但大文件的转换很耗时。若是将转换任务放到一个队列里面,那么颇有可能由于出现转换大文件,致使耗时太严重形成小文件转换延时的问题。

因此咱们按照文件大小设置了3个优先队列,而且每一个队列设置了不一样的workers,很好地解决了咱们文件转换的问题。

4,使用Celery的错误处理机制

大多数任务并无使用错误处理,若是任务失败,那就失败了。在一些状况下这很不错,可是做者见到的多数失败任务都是去调用第三方API而后出现了网络错误,或者资源不可用这些错误,而对于这些错误,最简单的方式就是重试一下,也许就是第三方API临时服务或者网络出现问题,没准立刻就行了,那么为何不试着重试一下呢?

@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
    try:
        print("doing stuff here...")
    except SomeNetworkException as e:
        print("maybe do some clenup here....")
        self.retry(e)

做者喜欢给每个任务定义一个等待多久重试的时间,以及最大的重试次数。固然还有更详细的参数设置,本身看文档去。

对于错误处理,咱们由于使用场景特殊,例如一个文件转换失败,那么不管多少次重试都会失败,因此没有加入重试机制。

5,使用Flower

Flower是一个很是强大的工具,用来监控celery的tasks和works。

这玩意咱们也没怎么使用,由于多数时候咱们都是直接链接redis去查看celery相关状况了。貌似挺傻逼的对不,尤为是celery在redis里面存放的数据并不能方便的取出来。

6,没事别太关注任务退出状态

一个任务状态就是该任务结束的时候成功仍是失败信息,没准在一些统计场合,这颇有用。但咱们须要知道,任务退出的状态并非该任务执行的结果,该任务执行的一些结果由于会对程序有影响,一般会被写入数据库(例如更新一个用户的朋友列表)。

做者见过的多数项目都将任务结束的状态存放到sqlite或者本身的数据库,可是存这些真有必要吗,没准可能影响到你的web服务的,因此做者一般设置CELERY_IGNORE_RESULT = True去丢弃。

对于咱们来讲,由于是异步任务,知道任务执行完成以后的状态真没啥用,因此果断丢弃。

7,不要给任务传递 Database/ORM 对象

这个其实就是不要传递Database对象(例如一个用户的实例)给任务,由于没准序列化以后的数据已是过时的数据了。因此最好仍是直接传递一个user id,而后在任务执行的时候实时的从数据库获取。

对于这个,咱们也是如此,给任务只传递相关id数据,譬如文件转换的时候,咱们只会传递文件的id,而其它文件信息的获取咱们都是直接经过该id从数据库里面取得。

最后

后面就是咱们本身的感触了,上面做者提到的Celery的使用,真的能够算是很好地实践方式,至少如今咱们的Celery没出过太大的问题,固然小坑仍是有的。至于RabbitMQ,这玩意咱们是真没用过,效果怎么样不知道,至少比mysql好用吧。

最后,附上做者的一个Celery Talk https://denibertovic.com/talks/celery-best-practices/

相关文章
相关标签/搜索