这篇文档描述了可用的配置选项。html
若是你使用默认的加载器,你必须建立 celeryconfig.py 模块而且保证它在python路径中。node
如下是配置示例,你能够从这个开始。它包括运行一个基本Celery应用的全部基础设置。python
## Broker settings. broker_url = 'amqp://guest:guest@localhost:5672//' # List of modules to import when the Celery worker starts. imports = ('myapp.tasks',) ## Using the database to store task state and results. result_backend = 'db+sqlite:///results.db' task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
4.0 版本引入了新的小写设置名称和机构环境。mysql
与之前版本的不一样,除了设置项名称变为小写字母外,还有一个前缀的重命名,例如 celerybeat_ 变为 beat_,celeryd_ 变为 worker,以及不少顶级 celery_ 设置重命名成了 task_ 前缀。git
Celery 仍然能读取老的配置文件,因此并不仓促迁移到新的设置格式。github
Setting name | Replace with |
---|---|
CELERY_ACCEPT_CONTENT | accept_content |
CELERY_ENABLE_UTC | enable_utc |
CELERY_IMPORTS | imports |
CELERY_INCLUDE | include |
CELERY_TIMEZONE | timezone |
CELERYBEAT_MAX_LOOP_INTERVAL | beat_max_loop_interval |
CELERYBEAT_SCHEDULE | beat_schedule |
CELERYBEAT_SCHEDULER | beat_scheduler |
CELERYBEAT_SCHEDULE_FILENAME | beat_schedule_filename |
CELERYBEAT_SYNC_EVERY | beat_sync_every |
BROKER_URL | broker_url |
BROKER_TRANSPORT | broker_transport |
BROKER_TRANSPORT_OPTIONS | broker_transport_options |
BROKER_CONNECTION_TIMEOUT | broker_connection_timeout |
BROKER_CONNECTION_RETRY | broker_connection_retry |
BROKER_CONNECTION_MAX_RETRIES | broker_connection_max_retries |
BROKER_FAILOVER_STRATEGY | broker_failover_strategy |
BROKER_HEARTBEAT | broker_heartbeat |
BROKER_LOGIN_METHOD | broker_login_method |
BROKER_POOL_LIMIT | broker_pool_limit |
BROKER_USE_SSL | broker_use_ssl |
CELERY_CACHE_BACKEND | cache_backend |
CELERY_CACHE_BACKEND_OPTIONS | cache_backend_options |
CASSANDRA_COLUMN_FAMILY | cassandra_table |
CASSANDRA_ENTRY_TTL | cassandra_entry_ttl |
CASSANDRA_KEYSPACE | cassandra_keyspace |
CASSANDRA_PORT | cassandra_port |
CASSANDRA_READ_CONSISTENCY | cassandra_read_consistency |
CASSANDRA_SERVERS | cassandra_servers |
CASSANDRA_WRITE_CONSISTENCY | cassandra_write_consistency |
CELERY_COUCHBASE_BACKEND_SETTINGS | couchbase_backend_settings |
CELERY_MONGODB_BACKEND_SETTINGS | mongodb_backend_settings |
CELERY_EVENT_QUEUE_EXPIRES | event_queue_expires |
CELERY_EVENT_QUEUE_TTL | event_queue_ttl |
CELERY_EVENT_QUEUE_PREFIX | event_queue_prefix |
CELERY_EVENT_SERIALIZER | event_serializer |
CELERY_REDIS_DB | redis_db |
CELERY_REDIS_HOST | redis_host |
CELERY_REDIS_MAX_CONNECTIONS | redis_max_connections |
CELERY_REDIS_PASSWORD | redis_password |
CELERY_REDIS_PORT | redis_port |
CELERY_RESULT_BACKEND | result_backend |
CELERY_MAX_CACHED_RESULTS | result_cache_max |
CELERY_MESSAGE_COMPRESSION | result_compression |
CELERY_RESULT_EXCHANGE | result_exchange |
CELERY_RESULT_EXCHANGE_TYPE | result_exchange_type |
CELERY_TASK_RESULT_EXPIRES | result_expires |
CELERY_RESULT_PERSISTENT | result_persistent |
CELERY_RESULT_SERIALIZER | result_serializer |
CELERY_RESULT_DBURI | Use result_backend instead. |
CELERY_RESULT_ENGINE_OPTIONS | database_engine_options |
[…]_DB_SHORT_LIVED_SESSIONS | database_short_lived_sessions |
CELERY_RESULT_DB_TABLE_NAMES | database_db_names |
CELERY_SECURITY_CERTIFICATE | security_certificate |
CELERY_SECURITY_CERT_STORE | security_cert_store |
CELERY_SECURITY_KEY | security_key |
CELERY_TASK_ACKS_LATE | task_acks_late |
CELERY_TASK_ALWAYS_EAGER | task_always_eager |
CELERY_TASK_ANNOTATIONS | task_annotations |
CELERY_TASK_COMPRESSION | task_compression |
CELERY_TASK_CREATE_MISSING_QUEUES | task_create_missing_queues |
CELERY_TASK_DEFAULT_DELIVERY_MODE | task_default_delivery_mode |
CELERY_TASK_DEFAULT_EXCHANGE | task_default_exchange |
CELERY_TASK_DEFAULT_EXCHANGE_TYPE | task_default_exchange_type |
CELERY_TASK_DEFAULT_QUEUE | task_default_queue |
CELERY_TASK_DEFAULT_RATE_LIMIT | task_default_rate_limit |
CELERY_TASK_DEFAULT_ROUTING_KEY | task_default_routing_key |
CELERY_TASK_EAGER_PROPAGATES | task_eager_propagates |
CELERY_TASK_IGNORE_RESULT | task_ignore_result |
CELERY_TASK_PUBLISH_RETRY | task_publish_retry |
CELERY_TASK_PUBLISH_RETRY_POLICY | task_publish_retry_policy |
CELERY_TASK_QUEUES | task_queues |
CELERY_TASK_ROUTES | task_routes |
CELERY_TASK_SEND_SENT_EVENT | task_send_sent_event |
CELERY_TASK_SERIALIZER | task_serializer |
CELERYD_TASK_SOFT_TIME_LIMIT | task_soft_time_limit |
CELERYD_TASK_TIME_LIMIT | task_time_limit |
CELERY_TRACK_STARTED | task_track_started |
CELERYD_AGENT | worker_agent |
CELERYD_AUTOSCALER | worker_autoscaler |
CELERYD_CONCURRENCY | worker_concurrency |
CELERYD_CONSUMER | worker_consumer |
CELERY_WORKER_DIRECT | worker_direct |
CELERY_DISABLE_RATE_LIMITS | worker_disable_rate_limits |
CELERY_ENABLE_REMOTE_CONTROL | worker_enable_remote_control |
CELERYD_HIJACK_ROOT_LOGGER | worker_hijack_root_logger |
CELERYD_LOG_COLOR | worker_log_color |
CELERYD_LOG_FORMAT | worker_log_format |
CELERYD_WORKER_LOST_WAIT | worker_lost_wait |
CELERYD_MAX_TASKS_PER_CHILD | worker_max_tasks_per_child |
CELERYD_POOL | worker_pool |
CELERYD_POOL_PUTLOCKS | worker_pool_putlocks |
CELERYD_POOL_RESTARTS | worker_pool_restarts |
CELERYD_PREFETCH_MULTIPLIER | worker_prefetch_multiplier |
CELERYD_REDIRECT_STDOUTS | worker_redirect_stdouts |
CELERYD_REDIRECT_STDOUTS_LEVEL | worker_redirect_stdouts_level |
CELERYD_SEND_EVENTS | worker_send_task_events |
CELERYD_STATE_DB | worker_state_db |
CELERYD_TASK_LOG_FORMAT | worker_task_log_format |
CELERYD_TIMER | worker_timer |
CELERYD_TIMER_PRECISION | worker_timer_precision |
若是接收到一个消息,其内容类型再也不上述列表中,它将会被丢弃并抛出一个错误。redis
默认状况下,任意内容类型都是启用的,包括pickle以及yaml,因此确保不受信任的第三方不能访问你的消息中间件。查看安全这一节获取更多信息。算法
示例:sql
# using serializer name accept_content = ['json'] # or the actual content-type (MIME) accept_content = ['application/json']
一旦启用,消息中的日期和时间将会转化成 UTC 时区。mongodb
注意2.5版本如下的工做单元将会认为全部消息都使用的本地时区,因此只有在全部的工做单元都升级了的状况下再启用这个特性。
设置Celery使用一个自定义的时区。这个时区值能够是pytz库支持的任意时区。
若是没有设置,UTC时区将被使用。为了向后兼容,还提供了一个 enable_utc设置,若是他设置成假,将使用系统本地时区。
如下将更改 tasks.add 任务的 rate_limit 属性:
task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}}
或者对全部的任务更改:
task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}}
你还能够更改方法,例如 on_failure 处理函数:
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print(‘Oh no! Task failed: {0!r}’.format(exc)) task_annotations = {‘*’: {‘on_failure’: my_on_failure}}
若是你须要更灵活的控制,那么你可使用对象而不是字典来选择任务来进行注解:
class MyAnnotate(object): def annotate(self, task): if task.name.startswith('tasks.'): return {'rate_limit': '10/s'} task_annotations = (MyAnnotate(), {other,})
默认发送未压缩的消息。
协议 2 在 3.1.24 以及 4.x+ 被支持
另见:
Serializers
决定当链接丢失或者其余链接错误时任务消息的发布是否会重试,查看 task_publish_retry_policy。
定义当链接丢失或者其余链接错误时任务消息的发布重试策略。
也就是说,任务将会在本地执行而不是发送到队列。
这与使用 apply() 带 throw=True 参数有一样的效果。
它须要 tblib 库,能够经过 pip 安装:
$ pip install celery[tblib]
查看 Bundles 获取关于组合多个扩展需求的信息。
task_ignore_result
默认值:禁用
是否存储任务返回值(tombstones)。若是你只是想在发生错误的时候记录返回值,能够设置:task_store_errors_even_if_ignored
task_store_errors_even_if_ignored
默认值:禁用
若是设置了,即便 Task.ignore_result 启用了,工做单元也会爱结果后端中存储全部的任务错误。
task_track_started
默认值:禁用
若是设置成真,当任务被工做单元执行时,任务将报告它的状态为started。默认值是假,由于一般行为是不作这种粒度级别的汇报。任务会处于 pending、finished 或者 waiting to be retried。当有长时间任务,而且须要知道当前正在运行什么任务时,有一个 started状态将会颇有用。
task_time_limit
默认值:没有时间限制
任务的硬时间限制,以秒为单位。若是这个时间限制被超过,处理任务的工做单元进程将会被杀死并使用一个新的替代。
task_soft_time_limit
默认值:没有时间限制
任务的软时间限制,以秒为单位
当这个时间限制超事后,SoftTimeLimitExceeded异常将会被抛出。例如,任务能够捕获这个异常在硬时间限制到达以前对环境进行清理:
from celery.exceptions import SoftTimeLimitExceeded @app.task def mytask(): try: return do_work() except SoftTimeLimitExceeded: cleanup_in_a_hurry()
另见:
FAQ: Shoud I use retry or acks_late
将这个设置成真可让消息从新入队,因此任务将会被再执行,在同一个工做单元或者另一个工做单元。
告警:
启用这个可能致使消息循环;确保你知道你在作什么
当任务没有一个自定义的速率限制时,这个值将被使用
另见:
worker_disable_rate_limits 设置能够禁用全部的速率限制
result_backend
默认值:默认不启用结果后端
用来存储结果的后端。能够是下列之一:
rpc
以 AMQP 消息形式发送结果。查看 RPC 后端设置
database
使用一个 SQLAlchemy 支持的结构化数据库。查看数据库后端设置
redis
使用 Redis 存储结果。查看 Redis 后端设置
cache
使用 Memcached 存储结果。查看 Cache 后端设置
cassandra
使用 Cassandra 存储结果。查看 Cassandra 后端设置
elasticsearch
使用 Elasticsearch 存储结果。查看 Elasticsearch 后端设置
ironcache
使用 IronCache 存储结果。查看 IronCache 后端设置
couchbase
使用 Couchbase 存储结果。查看 Couchbase 后端设置
couchdb
使用 CouchDB 存储结果。查看 CouchDB 后端设置
filesystem
使用共享文件夹存储结果。查看 File-system 后端设置
consul
使用 Consul K/V 存储结果。查看 Consul K/V 后端设置
result_serializer
默认值:从4.0版本开始使用 json(更早:pickle)
查看 Serializers 获取支持的序列化格式的信息。
result_compression
默认值:无压缩
结果值得可选压缩方法。支持 task_seralizer 设置相同的选项。
result_expires
默认值:1天后过时
存储的结果被删除的时间(秒数,或者一个 timedelta 对象)
(有一个内建的周期性任务将删除过时的任务结果(celery.backend_cleanup),前提是 celery beat 已经被启用。这个任务天天上午4点运行。
值 None 或者 0 意思是结果永不删除(取决于后端声明))
注意:
当前这个特性只支持 AMQP, database, cache, Redis 这些存储后端。当使用 database 存储后端,celery beat必须执行使得过时结果被删除。
对于老的 amqp 后端,存储结果一旦被消费它将再也不可用,此时这个特性将起到做用。
这是老的结果被删除以前总的结果缓存的数量。值 0 或者 None 意味着没有限制,而且值 -1 将禁用缓存。
Database URL 示例
使用一个数据库存储后端,你必须配置 result_backend 设置为一个链接的URL,而且带 db+ 前缀:
result_backend = 'db+scheme://user:password@host:port/dbname'
示例:
# sqlite (filename) result_backend = 'db+sqlite:///results.sqlite' # mysql result_backend = 'db+mysql://scott:tiger@localhost/foo' # postgresql result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase' # oracle result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'
查看 Supported Databases 获取支持的数据库的一个表,查看 Connection String 获取相关的链接字符串(这是 db+ 前缀后带的URI的一部分)
# echo enables verbose logging from SQLAlchemy. app.conf.database_engine_options = {'echo': True}
database_short_lived_sessions
默认值:默认禁用
默认禁用短会话。若是启用了,他们会急剧的下降性能,特别是对于处理不少任务的系统。当工做单元的流量很低,缓存的数据库链接会因为空闲而变为无用,进而会致使工做单元出错,这种状况下这个选项是有用的。例如:间歇性的错误如(OperationalError)(2006, ‘MySQL server has gone away’)经过启用短会话能解决。这个选项只影响数据库后端。
database_table_names
默认值:{} (空映射)
当 SQLAlchemy 设置成结果后端, Celery 自动建立两个表来存储任务的元数据。这个设置容许你自定义表名称:
# use custom table names for the database result backend. database_table_names = { 'task': 'myapp_taskmeta', 'group': 'myapp_groupmeta', }
配置示例:
result_backend = 'rpc://' result_persistent = False
注意:
缓存后端支持 pylibmc 和 python-memcached 库。后者只有在 pylibmc 没有安装时才会被使用。
使用一个 Memcached 服务器:
result_backend = 'cache+memcached://127.0.0.1:11211/'
使用多个 Memcached 服务器:
result_backend = """ cache+memcached://172.19.26.240:11211;172.19.26.242:11211/ """.strip()
“memory” 后端只在内存中存储缓存:
result_backend = 'cache' cache_backend = 'memory'
cache_backend_options = { 'binary': True, 'behaviors': {'tcp_nodelay': True}, }
注意:
Redis 后端须要 Redis 库。
可使用 pip 安装这个包:
$ pip install celery[redis]
查看 Bundles 获取组合多个扩展需求的信息
后端须要 result_backend 设置成一个 Redis URL:
result_backend = 'redis://:password@host:port/db'
例如:
result_backend = 'redis://localhost/0'
等同于:
result_backend = 'redis://'
URL 的字段以下定义:
1. password
链接数据库的密码
2. host
Redis 服务器的主机名或者IP地址(例如:localhost)
3. port
Redis 服务器的端口。默认是 6379
4. db
使用的数据库编号。默认是0。db 能够包含一个可选的斜杠
redis_backend_us_ssl
默认值:禁用
Redis后端支持 SSL。这个选项的合法值与 broker_use_ssl 相同
redis_max_connections
默认值:无显示
Redis 链接池的最大可用链接数,这些链接用来发送和接收结果
redis_socket_connect_timeout
5.0.1版本新特性
默认值:None
从存储后端链接到Redis服务器的链接的Socket超时时间(以秒为单位,int/float)
Cassandra 后端设置
注意:
Cassandra 后端驱动 cassandra-driver。
使用 pip 安装:
$ pip install celery[cassandra]
查看 Bundles 获取关于组合扩展需求的信息。
后端须要配置下列配置指令
cassandra_servers = ['localhost']
cassandra_port
默认值:9042.
链接到Cassandra服务器的端口
cassandra_keyspace
默认值: None.
存储结果的 key-space。例如:
cassandra_keyspace = 'tasks_keyspace'
cassandra_table = 'tasks'
cassandra_read_consistency
默认值: None.
使用的读一致性。值能够是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE
cassandra_write_consistency
默认值: None.
使用的写一致性。值能够是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE
cassandra_entry_ttl
默认值: None.
状态项的 Time-to-live。添加事后一段时间他们将会过时而且被删除。值 None (默认) 意味着他们永不过时
cassandra_auth_provider
默认值: None.
使用的 cassandra.auth 模块中的 AuthProvider。 值能够是 PlainTextAuthProvider 或者 SaslAuthProvider
cassandra_auth_kwargs
默认值: {} (空映射)
传递给 authentication provider 的命名参数。例如:
cassandra_auth_kwargs = { username: 'cassandra', password: 'cassandra' }
配置示例:
cassandra_servers = ['localhost'] cassandra_keyspace = 'celery' cassandra_table = 'tasks' cassandra_read_consistency = 'ONE' cassandra_write_consistency = 'ONE' cassandra_entry_ttl = 86400
使用 Elasticsearch 做为结果后端,你只须要将result_backend设置成正确的 URL。
配置示例:
result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
elasticsearch_retry_on_timeout
默认值: False
超时后是否应该触发在另外一个节点重试?
elasticsearch_max_retries
默认值: 3
异常被传递前的最大重试次数
elasticsearch_timeout
默认值: 10.0 秒
elasticsearch 使用的全局超时时间
注意:
Riak 后端须要 riak 库
使用 pip 进行安装:
$ pip install celery[riak]
查看 Bundles 获取组合多扩展需求的信息。
后端须要result_backend设置成一个 Riak URL:
result_backend = 'riak://host:port/bucket'
例如:
result_backend = 'riak://localhost/celery
等同于:
result_backend = 'riak://'
URL 的字段定义以下:
1. host
Riak 服务器的主机名或者IP地址(例如 localhost)
2. port
使用 protobuf 协议的Riak 服务器端口,默认是 8087
3. bucket
使用的Bucket名称。默认是 celery。bucket 名称须要是一个只包含ASCII字符的字符串。
另外,这个后端可使用以下配置指令进行配置:
riak_backend_settings
默认值: {} (空映射)
这是一个支持以下键的映射:
host
Riak 服务器的主机名或者IP地址(例如 localhost)
port
Riak 服务器端口。默认是 8087
bucket
使用的Bucket名称。默认是 celery。bucket 名称须要是一个只包含ASCII字符的字符串。
protocol
链接到 Riak 服务器使用的协议。这不能够经过 result_backend 配置
注意:
Dynamodb 后端须要 boto3 库
使用 pip 进行安装:
$ pip install celery[dynamodb]
查看 Bundles 获取组合多扩展需求的信息。
后端须要 result_backend 设置成一个 DynamoDB URL:
result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'
例如,声明 AWS 区域以及表名称:
result_backend = 'dynamodb://@us-east-1/celery_results
或者从环境中获取 AWS 配置参数,使用默认表名称(celery)以及声明读写吞吐量:
result_backend = 'dynamodb://@/?read=5&write=5'
或者在本地使用 DynamoDB 的可下载版本:
result_backend = 'dynamodb://@localhost:8000
URL 中的字段以下定义:
aws_access_key_id & aws_secret_access_key
访问 AWS API 资源的认证信息。这能够经过 boto3 从不一样的源获取到
region
AWS 区域,例如: us-east-1 或者本地版本的 localhost。查看 boto3 库文档获取更多的信息。
port
若是你使用的本地版本,这是本地DynamoDB示例监听的端口。若是你没有把区域设置成 localhost,这个设置选项将无效
table
使用的表名。默认是 celery。查看 DynamoDB 命名规则获取容许的字符以及表名长度的信息。
read & write
所建立的 DynamoBD 表的读写能力单元。默认的读写值都是 1。更多的细节能够从 Provisioned Throughput documentation 中获取到。
注意:
IronCache 后端须要 iron_celery 库:
使用 pip 进行安装:
$ pip install iron_celery
IronCache 经过在 result_backend 中配置的 URL 进行声明,例如:
result_backend = 'ironcache://project_id:token@'
或者更改缓存名称:
ironcache:://project_id:token@/awesomecache
更多的信息,查看 https://github.com/iron-io/iron_celery
注意:
Couchbase 后端须要 couchbase 库
使用 pip 进行安装:
$ pip install celery[couchbase]
查看 Bundle 获取组合多扩展需求的步骤。
后端能够经过 result_backend 设置成一个 Couchbase URL:
result_backend = 'couchbase://username:password@host:port/bucket'
默认值:{} (空映射)
这是一个支持以下键的映射:
注意:
CouchDB 后端须要 pycouchdb 库:
使用 pip 安装这个包:
$ pip install celery[couchdb]
查看 Bundles 获取更多关于组合多扩展需求的信息
后端能够经过 result_backend 配置成一个 CouchDB URL:
result_backend = 'couchdb://username:password@host:port/container'
URL 由如下部分组成:
后端能够经过一个文件 URL 配置,例如:
CELERY_RESULT_BACKEND = 'file:///var/celery/results'
配置的目录须要被共享,而且全部使用该后端的服务器均可写。
若是你在单独的一个系统上使用 Celery,你不须要任何进一步的配置就能够简单的使用这个后端。对于大型的集群,你可使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其余文件系统。
Consul 后端能够经过 URL 配置:
CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’
后端将在 Consul K/V 存储中做为单独键存储结果
后端使用Consul 中的 TTLs 支持结果的自动过时
若是你真的须要配置高级路由,这个设置应该是一个 kombu.Queue 对象的列表,工做单元能够从中消费。
注意工做单元能够经过 -Q 选项覆盖这个设置,或者这个列表中的单独队列能够经过 -X 选项进行排除。
查看 Basics 获取更多的信息。
默认值是 celery 队列的一个队列/消息交换器/绑定的键,消息交互类型是direct。
查看 task_routes
一个路由器能够经过以下方式声明:
(name, args, kwargs, options, task=None, **kwargs)
celery.routes.MapRoute
实例(pattern, route)
元组的列表,它将会转化成一个 celery.routes.MapRoute
实例 task_routes = { 'celery.ping': 'default', 'mytasks.add': 'cpu-bound', 'feed.tasks.*': 'feeds', # <-- glob pattern re.compile(r'(image|video)\.tasks\..*'): 'media', # <-- regex 'video.encode': { 'queue': 'video', 'exchange': 'media' 'routing_key': 'media.video.encode', }, } task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default}) 其中,myapp.tasks.route_task 能够是: def route_task(self, name, args, kwargs, options, task=None, **kw): if task == 'celery.ping': return {'queue': 'default'}
route_task 能够返回一个字符串或者一个字典。一个字符串表示 task_queues 中的一个队列名,而字典表示一个自定义的路由。
当发送消息,路由被按顺序询问。第一个返回非 None 值得路由将被使用。消息选项此时将与找到的路由设置合并,其中路由器的设置要优先。
例如: apply_async() 有这些参数:
Task.apply_async(immediate=False, exchange='video', routing_key='video.compress')
而且有一个路由器返回:
{'immediate': True, 'exchange': 'urgent'}
那么最终的消息选项将是:
immediate=True, exchange='urgent', routing_key='video.compress'
(以及Task类中定义的任意默认消息选项)
当进行合并时,task_routes 中定义的值会优先于 task_queues 中定义的值。
对于以下设置:
task_queues = { 'cpubound': { 'exchange': 'cpubound', 'routing_key': 'cpubound', }, } task_routes = { 'tasks.add': { 'queue': 'cpubound', 'routing_key': 'tasks.add', 'serializer': 'json', }, }
tasks.add 的最终路由选项将变为:
{'exchange': 'cpubound', 'routing_key': 'tasks.add', 'serializer': 'json'}
查看路由器获取更多的示例。
task_queue_ha_policy = 'all'
使用 all 将复制队列到全部的当前节点,或者你指定一个节点的列表:
task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']
使用一个列表将隐示设置 x-ha-policy为‘nodes,x-ha-policy-params` 为给定的节点列表
查看 http://www.rabbitmq.com/ha.html 获取更多的信息
task_queue_max_priority
消息中间件: RabbitMQ
默认值: None
查看 RabbitMQ Message Priorities
worker_direct
默认值: 禁用
这个选项使得每一个工做单元又一个专门的队列,因此任务能够路由到指定的工做单元。
每一个工做单元的队列名称是基于工做单元主机名和一个 .dq后缀自动产生的,使用 C.dq 消息交互器。
例如:节点名称为 w1@example.com 的工做单元的队列名称为:
w1@example.com.dq
此时,你能够经过指定主机名为路由键而且使用 C.dq 消息交互器来将任务路由到指定的节点。
task_routes = { 'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'} }
task_create_missing_queues
默认值:启用
若是启用(默认),任何声明的未在 task_queues 中未定义的队列都将自动被建立。查看 Automaci routing。
task_default_queue
默认值: celery
若是消息没有声明路由或者自定义的队列,apply_async 默认使用的队列名称。
这个队列必须在 task_queues 中。若是 task_queues 没有声明,那么他将自动建立一个队列项,而这个设置值就做为队列的名称。
另见:
修改默认队列的名称
task_default_exchange
默认值:”celery”
当 task_queues 设置中指定键没有声明自定义的消息交互器,那么这个默认的消息交互器将被使用。
task_default_exchange_type
默认值:”direct”
当 task_queues 设置中指定键没有声明自定义的消息交互器类型,那么这个默认的消息交互器类型将被使用。
task_default_routing_key
默认值:”celery”
当 task_queues 设置中指定键没有声明自定义的路由键,那么这个默认的路由键将被使用。
task_default_delivery_mode
默认值:”presistent”
能够是瞬态的(消息不写硬盘),或者持久的(写硬盘)
transport://userid:password@hostname:port/virtual_host
其中只有模式部分是必须的,其他部分都是可选的,默认会设置为对应传输中间件的默认值。
传输部分是使用的消息中间件的实现,默认是 amqp,(若是安装了librabbitmq会使用这个库,不然使用pyamqp)。还有其余可用的选择,包括 redis://、 sqs://、 qpid://。
模式部分能够是你本身的传输中间件实现的全限定路径:
broker_url = 'proj.transports.MyTransport://localhost'
能够配置多个消息中间件,使用相同的传输协议也行。消息中间件能够经过当个字符串声明,不一样的消息中间件URL之间用冒号分隔:
broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'
或者做为一个列表:
broker_url = [ 'transport://userid:password@localhost:port//', 'transport://userid:password@hostname:port//' ]
这些消息中间件将被用于broker_failover_strategy
查看Kombu 文档中的 URLs 章节获取更多的信息。
示例:
broker_read_url = 'amqp://user:pass@broker.example.com:56721' broker_write_url = 'amqp://user:pass@broker.example.com:56722'
全部选项均可以声明成一个列表,做为故障恢复的可选值,查看 broker_url 获取更多的信息
示例:
# Random failover strategy def random_failover_strategy(servers): it = list(servers) # don't modify callers list shuffle = random.shuffle for _ in repeat(None): shuffle(it) yield it[0] broker_failover_strategy = random_failover_strategy
注意:这个值只被工做单元使用,客户端此时不使用心跳。
由于单纯使用 TCP/IP 并不老是及时探测到链接丢失,因此 AMQP 定义了心跳,客户端和消息中间件用来检测链接是否关闭。
心跳会被监控,若是心跳值是 10 秒,那么检测心跳的时间间隔是 10 除以broker_heartbeat_checkrate (默认状况下,这个值是心跳值的两倍,因此对于10秒心跳,心跳每隔5秒检测一次)
工做单元会间隔监控消息中间件没有丢失过多的心跳。这个检测的速率是用 broker_heartbeat 值除以这个设置值获得的,因此若是心跳是 10.0 而且这个设置值是默认的2.0,那么这个监控将每隔5秒钟执行一次(心跳发送速率的两倍)
在消息中间件链接上使用SSL
这个选项的合法值依据使用的传输协议的不一样而不一样
注意SSL套接字通常会在消息中间件的一个单独的端口上服务。
如下示例提供了客户端证书,而且使用一个自定义的认证受权来验证服务器证书:
import ssl broker_use_ssl = { 'keyfile': '/var/ssl/private/worker-key.pem', 'certfile': '/var/ssl/amqp-server-cert.pem', 'ca_certs': '/var/ssl/myca.pem', 'cert_reqs': ssl.CERT_REQUIRED }
告警:
使用 broker_use_ssl=True 时请当心。可能你的默认配置根本不会验证服务器证书。请阅读python的 ssl module security considerations。
ssl_cert_reqs (required): one of the SSLContext.verify_mode values: ssl.CERT_NONE ssl.CERT_OPTIONAL ssl.CERT_REQUIRED ssl_ca_certs (optional): path to the CA certificate ssl_certfile (optional): path to the client certificate ssl_keyfile (optional): path to the client key
默认值:10
链接池中能够打开最大链接数。
从2.5版本开始链接池被默认启用,默认限制是10个链接。这个数值能够依据使用一个链接的 threads/green-threads (eventlet/gevent) 数量进行更改。例如:运行 eventlet 启动 1000 个 greenlets,他们使用一个链接到消息中间件,若是发生竞态条件,那么你应该开始增长这个限制。
若是设置成None或者0,链接池将会被禁用,而且每次使用链接都会从新创建链接并关闭。
broker_connection_timeout
默认值:4.0
放弃与AMQP服务器创建链接以前默认等待的超时时间。当使用 gevent 时该设置被禁用。
broker_connection_retry
默认值:启用
若是与 AMQP 消息中间件的链接断开,将自动从新创建链接
每次重试中间等待的时间会递增,而且在 broker_connection_max_retries 未达到以前会一只重试
若是设置成 0 或者 None,将一直重试
broker_login_method
默认值:AMQPLAIN
设置自定义的 amqp 登录方法
broker_transport_options
2.2 版本新特性
默认值:{} (空映射)
传递给底层传输中间件的一个附加选项的字典
设置可见超时时间的示例以下(Redis 与 SQS 传输中间件支持):
broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours
工做单元
这用来声明要导入的模块,可是它还可用来导入信号处理函数和附加的远程控制命令,等等。
这些模块将会以原来声明的顺序导入
这个设置中的模块是在 imports 设置中的模块导入以后才导入
若是你大部分操做是I/O操做,你能够设置更多的进程(线程),可是大部分状况下都是以CPU数做为定界,尝试让这个值接近你机器的CPU核数。若是没有设置,当前机器的 CPU核数将会被使用
禁用这个选项,只要将 worker_prefetch_multiplier 设置成 1。设置成 0 将容许工做单元持续消费它想要的尽量多的消息。
更详细的信息,请阅读 Prefetch Limits
注意:
带 ETA/countdown 的任务不会受 prefetch 限制的影响
worker_lost_wait
默认值:10.0 秒
有些状况下,工做单元可能在没有适当清理的状况下就被杀死,而且工做单元可能在终止前已经发布了一个结果。这个值声明了在抛出 WorkerLostError 异常以前咱们会在丢失的结果值上等待多久
worker_max_tasks_per_child
一个工做单元进程在被一个新的进程替代以前能够执行的最大任务数
worker_max_memory_per_child
默认值:没有限制。类型:int(kilobytes)
一个工做单元进程在被一个新的进程替代以前能够消耗的最大预留内存(单位KB)。若是单独一个任务就致使工做单元超过这个限制,当前的任务会执行完成,而且以后这个进程将会被更新替代。
示例:
worker_max_memory_per_child = 12000 # 12MB
worker_disable_rate_limits
默认值:禁用(启用速率限制)
即便任务显示设置了速率,仍然禁用全部速率限制
worker_state_db
默认值:None
存储工做单元状态的文件名称(如取消的任务)。能够是相对或者绝对路径,可是注意后缀.db 可能会被添加到文件名后(依赖于python 的版本)
也能够经过celery worker –statedb 参数设置
设置成1意味着调度器精度将为1秒。若是你须要毫秒精度,你能够设置成 0.1
worker_send_task_events
默认值:默认禁用
发送任务相关的事件,使得任务可使用相似flower 的工做监控到。为工做单元的 -E 参数设置默认值
task_send_sent_event
2.2 版本新特性
默认值:默认禁用
若是启用,对于每一个任务都将有一个 task-sent 事件被发送,所以任务在被消费前就能被追踪。
例如:若是这个值设置为10,被递送到这个队列的消息将会在10秒后被删除
event_queue_expires
支持的传输中间件: amqp
默认值:60.0 秒
一个监控客户端事件队列被删除前的过时时间(x-expires)。
event_queue_prefix
默认值: “celeryev”.
事件接收队列名称的前缀
event_serializer
默认值: “json”.
当发送事件消息时使用的消息序列化格式
control_queue_ttl
默认值: 300.0
control_queue_expires
默认值: 10.0
默认状况下,任意前面配置的根日志器的处理函数都将被移除。若是你想自定义日志处理函数,那么你能够经过设置 worker_hijack_root_logger = False 来禁用这个行为。
注意:
日志能够经过链接到 celery.signals.setup_logging 进行定制化
worker_log_color
默认值: 若是应用实例日志输出到一个终端,这个将启用
启用/禁用Celery 应用日志输出的颜色
worker_log_format
默认值:
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s 日志信息的格式
查看python 日志模块获取更多关于日志的信息
[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s
任务中记录日志使用的格式。查看python 日志模块获取更多关于日志的信息
工做单元和 beat 将使用到
包含私钥的文件的相对或者绝对路径,私钥用来在使用消息签名时对消息进行签名。
包含X.509认证的文件的相对或者绝对路径,认证用来在使用消息签名时对消息进行签名。
包含用来进行消息签名的X.509认证的目录。可使用文件名模式匹配(例如:/etc/certs/*.pem)
自定义组件类 (高级)
worker_pool
默认值:”prefork” (celery.concurrency.prefork:TaskPool).
工做单元使用的池类的名称
Eventlet/Gevent
永远不要使用这个选项来选择用eventlet 仍是 gevent。你必须对工做单元使用-P选项,确保应急补丁不会应用过迟,致使出现奇怪的现象。
worker_pool_restarts
默认值:默认禁用
若是启用,工做单元池可使用 pool_restart 远程控制命令进行重启
使用的自动扩展类的名称
worker_consumer
默认值:”celery.worker.consumer:Consumer”.
工做单元使用的消费类的名称
worker_timer
默认值:”kombu.async.hub.timer:Timer”.
工做单元使用的 ETA 调度器类的名称。默认值是被池具体实现设置。
beat_schedule
默认值: {} (空映射)
beat调度的周期性任务。查看Entries
beat_scheduler
默认值:”celery.beat:PersistentScheduler”.
默认的调度器类。若是同时使用django-celery-beat扩展,能够设置成 “django_celery_beat.schedulers:DatabaseScheduler”
也能够经过celery beat 的 -S 参数进行设置
也能够经过 celery beat 的 –schedule 参数进行设置
beat_sync_every
默认值:0.
另外一个数据库同步发起前能够执行的周期性任务的数量。值0(默认)表示基于时间同步 - 默认是3分钟,由scheduler.sync_every肯定。若是设置成1,beat将在每一个任务消息发送后发起同步。
beat_max_loop_interval
默认值: 0.
转自:https://blog.csdn.net/libing_thinking/article/details/78812472