#!/usr/bin/env python3 # coding: utf-8 from celery import Celery import settings pw = settings.SESSION_REDIS['password'] celery_broker = 'redis://:%s@localhost:6379/0' % pw celery_backend = celery_broker app = Celery('tasks', broker=celery_broker, backend=celery_backend) @app.task def analysis_main_12(current_id_str, q_num_str): pass @app.task def analysis_main_3(current_id_str, q_num_str): pass
from celery_tasks import analysis_main_12, analysis_main_3 def main(): ...... q = get_q3_from_db() ret = analysis_main_3.apply_async(args=(str(current_test.id), str(q_num)), queue='for_q_type3') q = get_q12_from_db() ret = analysis_main_12.apply_async(args=(str(current_test.id), str(q_num)), queue='for_q_type12') # ret是 "AsyncResult"对象, id 可由 ret.id取得 ...... if __name__ == '__main__': main()
注: 该文件中使用了mytaskfunction.apply_async(...)
而非mytaskfunction.delay(...)
:后者是前者的包装(使用更方便),而直接使用前者则可以使用更多参数,好比queue。
这里的queue正是要在消费者中配置使用的queue,注意名字要对应,不要写错。
python
import config from kombu import Queue, Exchange from celery import Celery app = Celery('tasks', broker=config.Celery_broker, backend=config.Celery_backend) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s:\t%(message)s') # 配置队列 CELERY_QUEUES = ( Queue('for_q_type3', Exchange('for_q_type3'), routing_key='for_q_type3'), # consumer_arguments={'x-priority': 10}), Queue('for_q_type12', Exchange('for_q_type12'), routing_key='for_q_type12'), # consumer_arguments={'x-priority': 1}), Queue('default', Exchange('default'), routing_key='default'), ) # consumer_arguments={'x-priority': 5} 数字越大,优先级越高 - only for rabbitmq? CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = 'default' CELERY_DEFAULT_ROUTING_KEY = 'default' CELERY_ROUTES = { # -- HIGH PRIORITY QUEUE -- # 'app.tasks.analysis_main_3': {'queue': 'for_q_type3'}, # -- LOW PRIORITY QUEUE -- # 'app.tasks.analysis_main_12': {'queue': 'for_q_type12'}, 'app.tasks.analysis_main': {'queue': 'default'}, } @app.task def analysis_main_12(current_id, q_num): ...... your code here ...... @app.task def analysis_main_3(current_id, q_num): ...... your code here ......
#!/bin/sh echo executing entrypoint.sh ... celery worker -A celery_tasks.app -n worker_Qtype12 -Q for_q_type12 --loglevel=info --concurrency=12 & celery worker -A celery_tasks.app -n worker_Qtype3 -Q for_q_type3 --loglevel=info --concurrency=8 & celery flower -A celery_tasks.app --address=0.0.0.0 --port=50080
上述配置中须要注意生产者、消费者和启动命令三者所用的queue是相对应的,不要写错。redis
上述配置只验证了多个任务队列,至于优先队列功能是否有效未作验证。 - priority queue 参考:
https://stackoverflow.com/questions/15809811/celery-tasks-that-need-to-run-in-prioritydocker