celery官方入门文档:http://docs.celeryproject.org/en/master/getting-started/index.htmlhtml
celery是用python开发的分布式任务调度模块,自己不包含消息服务,须要接入第三方的,好比RabbitMQ,Redis,还有其余一些,但不建议生产环境使用node
下面的以redis做为Broker,Backend来演示python
pip install celery
使用redis做为Broker时,须要再安装一个 celery-with-redislinux
pip install celery-with-redis
编写tasks.pyredis
from celery import Celery # redis URL: redis://:password@host:port/db_number app = Celery('tasks', broker='redis://:cds-china@172.20.3.3:6379/0', backend='redis://:cds-china@172.20.3.3:6379/1') @app.task def add(x, y): return x + y
cd到tasks.py所在目录,启动celery处理任务json
celery -A tasks worker --loglevel=info
若是启动不了,报错以下的话,即不建议root用户启动ubuntu
root@ubuntu:~/zq# celery -A tasks worker --loglevel=info Running a worker with superuser privileges when the worker accepts messages serialized with pickle is a very bad idea! If you really want to continue then you have to set the C_FORCE_ROOT environment variable (but please think about this before you do). User information: uid=0 euid=0 gid=0 egid=0
若是实在要作的话,下面2种方式解决后端
1. 代码里修改 from celery import platforms platforms.C_FORCE_ROOT = True 2. 设置变量C_FORCE_ROOT export C_FORCE_ROOT="true"
再次启动,以下所示的话,则正常启动安全
root@ubuntu:~/zq# celery -A tasks worker --loglevel=info /usr/local/lib/python2.7/dist-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, [2017-07-05 14:18:07,713: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@ubuntu v3.1.23 (Cipater) ---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f58957a9f50 - ** ---------- .> transport: redis://:**@172.20.3.3:6379/0 - ** ---------- .> results: redis://:**@172.20.3.3:6379/1 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2017-07-05 14:18:07,895: INFO/MainProcess] Connected to redis://:**@172.20.3.3:6379/0 [2017-07-05 14:18:07,910: INFO/MainProcess] mingle: searching for neighbors [2017-07-05 14:18:08,917: INFO/MainProcess] mingle: all alone [2017-07-05 14:18:08,928: WARNING/MainProcess] celery@ubuntu ready.
再开一个终端bash
root@ubuntu:~/zq# python Python 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> r=add.delay(1,1) >>> r.ready() True >>> r.get() 2 >>> r.id # task id 'bf779b9e-c779-4004-9864-68763090d560' >>> r=add.delay(2) >>> r.get() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 175, in get raise meta['result'] TypeError: add() takes exactly 2 arguments (1 given) >>> r.get(propagate=False) TypeError('add() takes exactly 2 arguments (1 given)',) >>> r.failed() True >>> r.successful() False >>> r.state 'FAILURE' # 调用任务,用delay()方法来调 # 这是apply_async()的快捷方式,该方法容许你更好地控制任务执行 # 调用任务会返回一个 AsyncResult 实例,可用于检查任务的状态,等待任务完成或获取返回值(若是任务失败,则为异常和回溯)。 但这个功能默认是不开启的,你须要设置一个 Celery 的结果后端 # state变迁:PENDING -> STARTED -> SUCCESS
此时worker服务端会出现以下log
[2017-07-05 14:29:20,165: INFO/MainProcess] Received task: tasks.add[e5737959-410f-4506-86f3-0b03ed359f00] [2017-07-05 14:29:22,170: INFO/MainProcess] Task tasks.add[e5737959-410f-4506-86f3-0b03ed359f00] succeeded in 2.00330784172s: 2 第一条显示收到了消息 第二条执行了task,返回2
若是不设置backend的话,则调用.ready() .get()时会报
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
celery有许多配置均可以设置,虽然默认的配置都已经够好了
两种方式:
1. 直接代码里指定
单独指定一个配置
app.conf.CELERY_TASK_SERIALIZER = 'json'
一次指定多个配置
app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
2. 模块方式,单独写在配置文件里
app.config_from_object('celeryconfig')
配置模块一般称为 celeryconfig ,你也可使用任意的模块名。config_from_object('celeryconfig')即会从当前目录或系统路径寻找celeryconfig.py文件。
celeryconfig.py
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
可经过
python -m celeryconfig
检查是否配置文件有语法格式错误
# 后台启动, 默认pidfile logfile都在当前目录 $ celery multi start 1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log # 重启 $ celery multi restart 1 --pidfile=/var/run/celery/%n.pid # 中止 $ celery multi stop 1 -A proj -l info # 安全中止,中止前等待全部的任务完成 $ celery multi stopwait 1 -A proj -l info 启动 root@ubuntu:~/zq# celery multi start 1 -A proj -l info -c4 --pidfile=%n.pid celery multi v3.1.23 (Cipater) > Starting nodes... > celery1@ubuntu: OK 启动后会看到进程 root@ubuntu:~/zq# ps aux | grep cel root 24103 5.1 0.0 80364 24992 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24110 0.0 0.0 78904 20796 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24111 0.0 0.0 78904 20732 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24112 0.0 0.0 78904 20740 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24113 0.0 0.0 78904 20816 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu 重启 root@ubuntu:~/zq# celery multi restart 1 --pidfile=1.pid celery multi v3.1.23 (Cipater) > Stopping nodes... > celery1@ubuntu: TERM -> 24103 > Waiting for 1 node -> 24103..... > celery1@ubuntu: OK > Restarting node celery1@ubuntu: OK 中止 root@ubuntu:~/zq# celery multi stop 1 -A proj -l info celery multi v3.1.23 (Cipater) > celery1@ubuntu: DOWN
查看运行时的一些信息
以上全部命令均可加上-d (--destination)选项,查看具体的一个或多个工做单元,不然命令将会发送到全部的工做单元。
工做单元:
好比前面经过 celery -A tasks worker --loglevel=info 就是启动一个工做单元
也能够一台机器上启动多个工做单元,经过--hostname声明名称
celery -A tasks worker --loglevel=info -n worker1@%h celery -A tasks worker --loglevel=info -n worker2@%h celery -A tasks worker --loglevel=info -n worker3@%h hostname 参数可使用如下变量扩展: - %h: 主机名,包含域名 - %n: 主机名 - %d: 域名 示例见下: Variable Template Result %h worker1@%h worker1@george.example.com %n worker1@%n worker1@george %d worker1@%d worker1@example.com
指定一个工做单元查看活动的任务
celery -A tasks inspect -d worker1@ubuntu active
所有示例见下,如下都没有指定工做单元:
root@ubuntu:~/zq# celery -A tasks inspect active -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect scheduled -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect reserved -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect revoked -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect registered # 这里只有一个add的测试任务 -> celery@ubuntu: OK * tasks.add root@ubuntu:~/zq# celery -A tasks inspect stats # 查看当前状态, broker,pool等 -> celery@ubuntu: OK { "broker": { "alternates": [], "connect_timeout": 4, "heartbeat": null, "hostname": "172.20.3.3", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "0" }, "clock": "2836", "pid": 25209, "pool": { "max-concurrency": 8, "max-tasks-per-child": "N/A", "processes": [ 25221, 25222, 25223, 25224, 25225, 25226, 25227, 25228 ], "put-guarded-by-semaphore": false, "timeouts": [ 0, 0 ], "writes": { "all": "33.33%, 33.33%, 33.33%", "avg": "33.33%", "inqueues": { "active": 0, "total": 8 }, "raw": "1, 1, 1", "total": 3 } }, "prefetch_count": 32, "rusage": { "idrss": 0, "inblock": 24, "isrss": 0, "ixrss": 0, "majflt": 0, "maxrss": 25016, "minflt": 24667, "msgrcv": 0, "msgsnd": 0, "nivcsw": 280, "nsignals": 0, "nswap": 0, "nvcsw": 6511, "oublock": 8, "stime": 0.483572, "utime": 3.573325 }, "total": { "tasks.add": 3 } } # 关于stats中每一个参数的意思见http://docs.celeryproject.org/en/latest/userguide/workers.html#statistics
conf可查看配置,能看到不少东西
root@ubuntu:~/zq# celery -A tasks inspect conf -> celery@ubuntu: OK { "ADMINS": [], "BROKER_CONNECTION_MAX_RETRIES": 100, "BROKER_CONNECTION_RETRY": true, "BROKER_CONNECTION_TIMEOUT": 4, "BROKER_FAILOVER_STRATEGY": null, "BROKER_HEARTBEAT": null, "BROKER_HEARTBEAT_CHECKRATE": 3.0, "BROKER_HOST": null, "BROKER_LOGIN_METHOD": null, "BROKER_PASSWORD": "********", "BROKER_POOL_LIMIT": 10, "BROKER_PORT": null, "BROKER_TRANSPORT": null, "BROKER_TRANSPORT_OPTIONS": {}, "BROKER_URL": "redis://:********@172.20.3.3:6379/0", "BROKER_USER": null, "BROKER_USE_SSL": false, "BROKER_VHOST": null, "CASSANDRA_COLUMN_FAMILY": null, "CASSANDRA_DETAILED_MODE": false, "CASSANDRA_KEYSPACE": "********", "CASSANDRA_READ_CONSISTENCY": null, "CASSANDRA_SERVERS": null, "CASSANDRA_WRITE_CONSISTENCY": null, "CELERYBEAT_LOG_FILE": null, "CELERYBEAT_LOG_LEVEL": "INFO", "CELERYBEAT_MAX_LOOP_INTERVAL": 0, "CELERYBEAT_SCHEDULE": {}, "CELERYBEAT_SCHEDULER": "celery.beat:PersistentScheduler", "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule", "CELERYBEAT_SYNC_EVERY": 0, "CELERYD_AGENT": null, "CELERYD_AUTORELOADER": "celery.worker.autoreload:Autoreloader", "CELERYD_AUTOSCALER": "celery.worker.autoscale:Autoscaler", "CELERYD_CONCURRENCY": 0, "CELERYD_CONSUMER": "celery.worker.consumer:Consumer", "CELERYD_FORCE_EXECV": false, "CELERYD_HIJACK_ROOT_LOGGER": true, "CELERYD_LOG_COLOR": null, "CELERYD_LOG_FILE": null, "CELERYD_LOG_FORMAT": "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s", "CELERYD_LOG_LEVEL": "WARN", "CELERYD_MAX_TASKS_PER_CHILD": null, "CELERYD_POOL": "prefork", "CELERYD_POOL_PUTLOCKS": true, "CELERYD_POOL_RESTARTS": false, "CELERYD_PREFETCH_MULTIPLIER": 4, "CELERYD_STATE_DB": null, "CELERYD_TASK_LOG_FORMAT": "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s", "CELERYD_TASK_SOFT_TIME_LIMIT": null, "CELERYD_TASK_TIME_LIMIT": null, "CELERYD_TIMER": null, "CELERYD_TIMER_PRECISION": 1.0, "CELERYD_WORKER_LOST_WAIT": 10.0, "CELERYMON_LOG_FILE": null, "CELERYMON_LOG_FORMAT": "[%(asctime)s: %(levelname)s] %(message)s", "CELERYMON_LOG_LEVEL": "INFO", "CELERY_ACCEPT_CONTENT": [ "json", "pickle", "msgpack", "yaml" ], "CELERY_ACKS_LATE": false, "CELERY_ALWAYS_EAGER": false, "CELERY_ANNOTATIONS": null, "CELERY_BROADCAST_EXCHANGE": "celeryctl", "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout", "CELERY_BROADCAST_QUEUE": "celeryctl", "CELERY_CACHE_BACKEND": null, "CELERY_CACHE_BACKEND_OPTIONS": {}, "CELERY_CHORD_PROPAGATES": true, "CELERY_COUCHBASE_BACKEND_SETTINGS": null, "CELERY_CREATE_MISSING_QUEUES": true, "CELERY_DEFAULT_DELIVERY_MODE": 2, "CELERY_DEFAULT_EXCHANGE": "celery", "CELERY_DEFAULT_EXCHANGE_TYPE": "direct", "CELERY_DEFAULT_QUEUE": "celery", "CELERY_DEFAULT_RATE_LIMIT": null, "CELERY_DEFAULT_ROUTING_KEY": "********", "CELERY_DISABLE_RATE_LIMITS": false, "CELERY_EAGER_PROPAGATES_EXCEPTIONS": false, "CELERY_ENABLE_REMOTE_CONTROL": true, "CELERY_ENABLE_UTC": true, "CELERY_EVENT_QUEUE_EXPIRES": null, "CELERY_EVENT_QUEUE_TTL": null, "CELERY_EVENT_SERIALIZER": "json", "CELERY_IGNORE_RESULT": false, "CELERY_IMPORTS": [], "CELERY_INCLUDE": [ "celery.app.builtins", "tasks" ], "CELERY_MAX_CACHED_RESULTS": 100, "CELERY_MESSAGE_COMPRESSION": null, "CELERY_MONGODB_BACKEND_SETTINGS": null, "CELERY_QUEUES": null, "CELERY_QUEUE_HA_POLICY": null, "CELERY_REDIRECT_STDOUTS": true, "CELERY_REDIRECT_STDOUTS_LEVEL": "WARNING", "CELERY_REDIS_DB": null, "CELERY_REDIS_HOST": null, "CELERY_REDIS_MAX_CONNECTIONS": null, "CELERY_REDIS_PASSWORD": "********", "CELERY_REDIS_PORT": null, "CELERY_RESULT_BACKEND": "redis://:********@172.20.3.3:6379/1", "CELERY_RESULT_DBURI": null, "CELERY_RESULT_DB_SHORT_LIVED_SESSIONS": false, "CELERY_RESULT_DB_TABLENAMES": null, "CELERY_RESULT_ENGINE_OPTIONS": null, "CELERY_RESULT_EXCHANGE": "celeryresults", "CELERY_RESULT_EXCHANGE_TYPE": "direct", "CELERY_RESULT_PERSISTENT": null, "CELERY_RESULT_SERIALIZER": "pickle", "CELERY_ROUTES": null, "CELERY_SECURITY_CERTIFICATE": null, "CELERY_SECURITY_CERT_STORE": null, "CELERY_SECURITY_KEY": "********", "CELERY_SEND_EVENTS": false, "CELERY_SEND_TASK_ERROR_EMAILS": false, "CELERY_SEND_TASK_SENT_EVENT": false, "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": false, "CELERY_TASK_PUBLISH_RETRY": true, "CELERY_TASK_PUBLISH_RETRY_POLICY": {}, "CELERY_TASK_RESULT_EXPIRES": "1 day, 0:00:00", "CELERY_TASK_SERIALIZER": "pickle", "CELERY_TIMEZONE": null, "CELERY_TRACK_STARTED": false, "CELERY_WORKER_DIRECT": false, "EMAIL_HOST": "localhost", "EMAIL_HOST_PASSWORD": "********", "EMAIL_HOST_USER": null, "EMAIL_PORT": 25, "EMAIL_TIMEOUT": 2, "EMAIL_USE_SSL": false, "EMAIL_USE_TLS": false, "SERVER_EMAIL": "celery@localhost" }
可控制运行时的一些行为
[Commands] | add_consumer <queue> [exchange [type [routing_key]]] | tell worker(s) to start consuming a queue | autoscale [max] [min] | change autoscale settings | cancel_consumer <queue> | tell worker(s) to stop consuming a queue | disable_events | tell worker(s) to disable events | enable_events | tell worker(s) to enable events | pool_grow [N=1] | start more pool processes | pool_shrink [N=1] | use less pool processes | rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)> | tell worker(s) to modify the rate limit for a task type | time_limit <task_name> <soft_secs> [hard_secs] | tell worker(s) to modify the time limit for a task type.
For example you can force workers to enable event messages (used for monitoring tasks and workers):
celery -A proj control enable_events root@ubuntu:~/zq# celery -A proj control enable_events -> celery@ubuntu: OK task events enabled worker服务端log: [2017-07-05 17:00:32,450: INFO/MainProcess] Events of group {task} enabled by remote.
打开一个监控界面
celery -A proj events
显示以下:
中止监控
root@ubuntu:~/zq# celery -A proj control disable_events -> celery@ubuntu: OK task events disabled worker服务端log: [2017-07-05 17:03:39,981: INFO/MainProcess] Events of group {task} disabled by remote.
更深刻一些的见下连接http://docs.celeryproject.org/en/master/userguide/workers.html#guide-workers