Celery-4.1 用户指南: Configuration and defaults (配置和默认值)

这篇文档描述了可用的配置选项。html

若是你使用默认的加载器,你必须建立 celeryconfig.py 模块而且保证它在python路径中。node

配置文件示例

如下是配置示例,你能够从这个开始。它包括运行一个基本Celery应用的全部基础设置。python

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新的小写设置

4.0 版本引入了新的小写设置名称和机构环境。mysql

与之前版本的不一样,除了设置项名称变为小写字母外,还有一个前缀的重命名,例如 celerybeat_ 变为 beat_,celeryd_ 变为 worker,以及不少顶级 celery_ 设置重命名成了 task_ 前缀。git

Celery 仍然能读取老的配置文件,因此并不仓促迁移到新的设置格式。github

Setting name Replace with
CELERY_ACCEPT_CONTENT accept_content
CELERY_ENABLE_UTC enable_utc
CELERY_IMPORTS imports
CELERY_INCLUDE include
CELERY_TIMEZONE timezone
CELERYBEAT_MAX_LOOP_INTERVAL beat_max_loop_interval
CELERYBEAT_SCHEDULE beat_schedule
CELERYBEAT_SCHEDULER beat_scheduler
CELERYBEAT_SCHEDULE_FILENAME beat_schedule_filename
CELERYBEAT_SYNC_EVERY beat_sync_every
BROKER_URL broker_url
BROKER_TRANSPORT broker_transport
BROKER_TRANSPORT_OPTIONS broker_transport_options
BROKER_CONNECTION_TIMEOUT broker_connection_timeout
BROKER_CONNECTION_RETRY broker_connection_retry
BROKER_CONNECTION_MAX_RETRIES broker_connection_max_retries
BROKER_FAILOVER_STRATEGY broker_failover_strategy
BROKER_HEARTBEAT broker_heartbeat
BROKER_LOGIN_METHOD broker_login_method
BROKER_POOL_LIMIT broker_pool_limit
BROKER_USE_SSL broker_use_ssl
CELERY_CACHE_BACKEND cache_backend
CELERY_CACHE_BACKEND_OPTIONS cache_backend_options
CASSANDRA_COLUMN_FAMILY cassandra_table
CASSANDRA_ENTRY_TTL cassandra_entry_ttl
CASSANDRA_KEYSPACE cassandra_keyspace
CASSANDRA_PORT cassandra_port
CASSANDRA_READ_CONSISTENCY cassandra_read_consistency
CASSANDRA_SERVERS cassandra_servers
CASSANDRA_WRITE_CONSISTENCY cassandra_write_consistency
CELERY_COUCHBASE_BACKEND_SETTINGS couchbase_backend_settings
CELERY_MONGODB_BACKEND_SETTINGS mongodb_backend_settings
CELERY_EVENT_QUEUE_EXPIRES event_queue_expires
CELERY_EVENT_QUEUE_TTL event_queue_ttl
CELERY_EVENT_QUEUE_PREFIX event_queue_prefix
CELERY_EVENT_SERIALIZER event_serializer
CELERY_REDIS_DB redis_db
CELERY_REDIS_HOST redis_host
CELERY_REDIS_MAX_CONNECTIONS redis_max_connections
CELERY_REDIS_PASSWORD redis_password
CELERY_REDIS_PORT redis_port
CELERY_RESULT_BACKEND result_backend
CELERY_MAX_CACHED_RESULTS result_cache_max
CELERY_MESSAGE_COMPRESSION result_compression
CELERY_RESULT_EXCHANGE result_exchange
CELERY_RESULT_EXCHANGE_TYPE result_exchange_type
CELERY_TASK_RESULT_EXPIRES result_expires
CELERY_RESULT_PERSISTENT result_persistent
CELERY_RESULT_SERIALIZER result_serializer
CELERY_RESULT_DBURI Use result_backend instead.
CELERY_RESULT_ENGINE_OPTIONS database_engine_options
[…]_DB_SHORT_LIVED_SESSIONS database_short_lived_sessions
CELERY_RESULT_DB_TABLE_NAMES database_db_names
CELERY_SECURITY_CERTIFICATE security_certificate
CELERY_SECURITY_CERT_STORE security_cert_store
CELERY_SECURITY_KEY security_key
CELERY_TASK_ACKS_LATE task_acks_late
CELERY_TASK_ALWAYS_EAGER task_always_eager
CELERY_TASK_ANNOTATIONS task_annotations
CELERY_TASK_COMPRESSION task_compression
CELERY_TASK_CREATE_MISSING_QUEUES task_create_missing_queues
CELERY_TASK_DEFAULT_DELIVERY_MODE task_default_delivery_mode
CELERY_TASK_DEFAULT_EXCHANGE task_default_exchange
CELERY_TASK_DEFAULT_EXCHANGE_TYPE task_default_exchange_type
CELERY_TASK_DEFAULT_QUEUE task_default_queue
CELERY_TASK_DEFAULT_RATE_LIMIT task_default_rate_limit
CELERY_TASK_DEFAULT_ROUTING_KEY task_default_routing_key
CELERY_TASK_EAGER_PROPAGATES task_eager_propagates
CELERY_TASK_IGNORE_RESULT task_ignore_result
CELERY_TASK_PUBLISH_RETRY task_publish_retry
CELERY_TASK_PUBLISH_RETRY_POLICY task_publish_retry_policy
CELERY_TASK_QUEUES task_queues
CELERY_TASK_ROUTES task_routes
CELERY_TASK_SEND_SENT_EVENT task_send_sent_event
CELERY_TASK_SERIALIZER task_serializer
CELERYD_TASK_SOFT_TIME_LIMIT task_soft_time_limit
CELERYD_TASK_TIME_LIMIT task_time_limit
CELERY_TRACK_STARTED task_track_started
CELERYD_AGENT worker_agent
CELERYD_AUTOSCALER worker_autoscaler
CELERYD_CONCURRENCY worker_concurrency
CELERYD_CONSUMER worker_consumer
CELERY_WORKER_DIRECT worker_direct
CELERY_DISABLE_RATE_LIMITS worker_disable_rate_limits
CELERY_ENABLE_REMOTE_CONTROL worker_enable_remote_control
CELERYD_HIJACK_ROOT_LOGGER worker_hijack_root_logger
CELERYD_LOG_COLOR worker_log_color
CELERYD_LOG_FORMAT worker_log_format
CELERYD_WORKER_LOST_WAIT worker_lost_wait
CELERYD_MAX_TASKS_PER_CHILD worker_max_tasks_per_child
CELERYD_POOL worker_pool
CELERYD_POOL_PUTLOCKS worker_pool_putlocks
CELERYD_POOL_RESTARTS worker_pool_restarts
CELERYD_PREFETCH_MULTIPLIER worker_prefetch_multiplier
CELERYD_REDIRECT_STDOUTS worker_redirect_stdouts
CELERYD_REDIRECT_STDOUTS_LEVEL worker_redirect_stdouts_level
CELERYD_SEND_EVENTS worker_send_task_events
CELERYD_STATE_DB worker_state_db
CELERYD_TASK_LOG_FORMAT worker_task_log_format
CELERYD_TIMER worker_timer
CELERYD_TIMER_PRECISION worker_timer_precision

配置指示

通用设置

  • accept_content
    默认值: {‘json’} (set, list, or tuple).
    容许的内容类型/序列化器的白名单

若是接收到一个消息,其内容类型再也不上述列表中,它将会被丢弃并抛出一个错误。redis

默认状况下,任意内容类型都是启用的,包括pickle以及yaml,因此确保不受信任的第三方不能访问你的消息中间件。查看安全这一节获取更多信息。算法

示例:sql

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']

时间与日期设置

  • enable_utc
    2.5 版本新特性。
    默认值:从 3.0 版本开始默认启用

  一旦启用,消息中的日期和时间将会转化成 UTC 时区。mongodb

  注意2.5版本如下的工做单元将会认为全部消息都使用的本地时区,因此只有在全部的工做单元都升级了的状况下再启用这个特性。

  • timezone
    2.5版本新特性
    默认值: “UTC”

  设置Celery使用一个自定义的时区。这个时区值能够是pytz库支持的任意时区。

  若是没有设置,UTC时区将被使用。为了向后兼容,还提供了一个 enable_utc设置,若是他设置成假,将使用系统本地时区。

任务设置

  • task_annotations
    这个设置能够用来在配置文件中重写任意任务属性。这个设置能够是一个字典,获取一个annotation对象的列表,这个列表对任务进行过滤,对匹配的任务名称起做用,并返回待更改属性的一个映射。

如下将更改 tasks.add 任务的 rate_limit 属性:

task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}} 

或者对全部的任务更改:

task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}} 


你还能够更改方法,例如 on_failure 处理函数:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
  print(‘Oh no! Task failed: {0!r}’.format(exc))

task_annotations = {‘*’: {‘on_failure’: my_on_failure}} 

若是你须要更灵活的控制,那么你可使用对象而不是字典来选择任务来进行注解:

class MyAnnotate(object):

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})
  • task_compression
    默认值: None
    任务消息的默认压缩算法。能够是 gzip、bzip2(若是可用),或者任意在 Kombu 压缩模式注册表中注册的自定义压缩算法。

   默认发送未压缩的消息。

  • task_protocol
    默认值:2(从4.0版本开始)
    设置默认的任务消息协议版本。支持的协议:1 和 2

   协议 2 在 3.1.24 以及 4.x+ 被支持

  • task_serializer
    默认值:“json”(从4.0版本开始,更早:pickle)
    一个表示使用的默认序列化方法的字符串。能够是 json(默认)、pickle、 yaml、msgpack,或者任意在 kombu.serialization.registry 中注册过的自定义序列化方法。

  另见:
    Serializers

  • task_publish_retry
    2.2版本新特性
    默认值:启用

   决定当链接丢失或者其余链接错误时任务消息的发布是否会重试,查看 task_publish_retry_policy。

  • task_publish_retry_policy
    2.2版本新特性
    默认值:查看 Message Sending Retry。

   定义当链接丢失或者其余链接错误时任务消息的发布重试策略。

任务执行设置

  • task_always_eager
    默认值:禁用
    若是设置成 True,全部的任务都将在本地执行知道任务返回。apply_async() 以及Task.delay()将返回一个 EagerResult 实例,模拟AsyncResult实例的API和行为,除了这个结果是已经计算过的以外。

   也就是说,任务将会在本地执行而不是发送到队列。

  • task_eager_propagates
    默认值:禁用
    若是设置成 True,本地执行的任务(使用 task.apply(),或者 task_always_eager 被启用)将传递异常。

   这与使用 apply()throw=True 参数有一样的效果。

  • task_remote_tracebacks
    默认值:禁用
    若是启用了,当从新抛出任务错误时,任务结果将会包括工做单元的堆栈信息。

   它须要 tblib 库,能够经过 pip 安装:

    $ pip install celery[tblib] 


    查看 Bundles 获取关于组合多个扩展需求的信息。

  • task_ignore_result
    默认值:禁用
    是否存储任务返回值(tombstones)。若是你只是想在发生错误的时候记录返回值,能够设置:task_store_errors_even_if_ignored

  • task_store_errors_even_if_ignored
    默认值:禁用
    若是设置了,即便 Task.ignore_result 启用了,工做单元也会爱结果后端中存储全部的任务错误。

  • task_track_started
    默认值:禁用
    若是设置成真,当任务被工做单元执行时,任务将报告它的状态为started。默认值是假,由于一般行为是不作这种粒度级别的汇报。任务会处于 pending、finished 或者 waiting to be retried。当有长时间任务,而且须要知道当前正在运行什么任务时,有一个 started状态将会颇有用。

  • task_time_limit
    默认值:没有时间限制
    任务的硬时间限制,以秒为单位。若是这个时间限制被超过,处理任务的工做单元进程将会被杀死并使用一个新的替代。

  • task_soft_time_limit
    默认值:没有时间限制
    任务的软时间限制,以秒为单位

  当这个时间限制超事后,SoftTimeLimitExceeded异常将会被抛出。例如,任务能够捕获这个异常在硬时间限制到达以前对环境进行清理:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()
  • task_acks_late
    默认值:禁用
    延迟确认意味着任务消息将在任务执行完成以后再进行确认,而不是刚开始时(默认行为)。

  另见:
    FAQ: Shoud I use retry or acks_late

  • task_reject_on_worker_lost
    默认值:禁用
    即便 task_acks_late 被启用,当处理任务的工做单元异常退出或者收到信号而退出时工做单元将会确认任务消息。

   将这个设置成真可让消息从新入队,因此任务将会被再执行,在同一个工做单元或者另一个工做单元。

   告警:
    启用这个可能致使消息循环;确保你知道你在作什么

  • task_default_rate_limit
    默认值:没有速率限制
    任务的全局默认速率限制

   当任务没有一个自定义的速率限制时,这个值将被使用

   另见:
     worker_disable_rate_limits 设置能够禁用全部的速率限制

任务结果后端设置

  • result_backend
    默认值:默认不启用结果后端
    用来存储结果的后端。能够是下列之一:

    1. rpc
      以 AMQP 消息形式发送结果。查看 RPC 后端设置

    2. database
      使用一个 SQLAlchemy 支持的结构化数据库。查看数据库后端设置

    3. redis
      使用 Redis 存储结果。查看 Redis 后端设置

    4. cache
      使用 Memcached 存储结果。查看 Cache 后端设置

    5. cassandra
      使用 Cassandra 存储结果。查看 Cassandra 后端设置

    6. elasticsearch
      使用 Elasticsearch 存储结果。查看 Elasticsearch 后端设置

    7. ironcache
      使用 IronCache 存储结果。查看 IronCache 后端设置

    8. couchbase
      使用 Couchbase 存储结果。查看 Couchbase 后端设置

    9. couchdb
      使用 CouchDB 存储结果。查看 CouchDB 后端设置

    10. filesystem
      使用共享文件夹存储结果。查看 File-system 后端设置

    11. consul
      使用 Consul K/V 存储结果。查看 Consul K/V 后端设置

  • result_serializer
    默认值:从4.0版本开始使用 json(更早:pickle)
    查看 Serializers 获取支持的序列化格式的信息。

  • result_compression
    默认值:无压缩
    结果值得可选压缩方法。支持 task_seralizer 设置相同的选项。

  • result_expires
    默认值:1天后过时
    存储的结果被删除的时间(秒数,或者一个 timedelta 对象)

   (有一个内建的周期性任务将删除过时的任务结果(celery.backend_cleanup),前提是 celery beat 已经被启用。这个任务天天上午4点运行。

   值 None 或者 0 意思是结果永不删除(取决于后端声明))

   注意:
    当前这个特性只支持 AMQP, database, cache, Redis 这些存储后端。当使用 database 存储后端,celery beat必须执行使得过时结果被删除。

  • result_cache_max
    默认值:默认禁用
    启用结果的客户端缓存。

   对于老的 amqp 后端,存储结果一旦被消费它将再也不可用,此时这个特性将起到做用。

   这是老的结果被删除以前总的结果缓存的数量。值 0 或者 None 意味着没有限制,而且值 -1 将禁用缓存。

Database 后端设置

Database URL 示例

使用一个数据库存储后端,你必须配置 result_backend 设置为一个链接的URL,而且带 db+ 前缀:

result_backend = 'db+scheme://user:password@host:port/dbname'

示例:

# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

查看 Supported Databases 获取支持的数据库的一个表,查看 Connection String 获取相关的链接字符串(这是 db+ 前缀后带的URI的一部分)

  • database_engine_options
    默认值:{} (空映射)
    你可使用 sqlalchmey_engine_options 设置声明额外的 SQLAchemy 数据库引擎选项:
# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
  • database_short_lived_sessions
    默认值:默认禁用
    默认禁用短会话。若是启用了,他们会急剧的下降性能,特别是对于处理不少任务的系统。当工做单元的流量很低,缓存的数据库链接会因为空闲而变为无用,进而会致使工做单元出错,这种状况下这个选项是有用的。例如:间歇性的错误如(OperationalError)(2006, ‘MySQL server has gone away’)经过启用短会话能解决。这个选项只影响数据库后端。

  • database_table_names
    默认值:{} (空映射)
    当 SQLAlchemy 设置成结果后端, Celery 自动建立两个表来存储任务的元数据。这个设置容许你自定义表名称:

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPC 后端设置

  • result_persistent
    默认值:默认被禁用(瞬态消息)
    若是设置成 True,结果消息将被持久化。这意味着消息中间件重启后消息不会丢失。

  配置示例:

result_backend = 'rpc://'
result_persistent = False

Cache 后端设置

注意:
  缓存后端支持 pylibmc 和 python-memcached 库。后者只有在 pylibmc 没有安装时才会被使用。

使用一个 Memcached 服务器:

result_backend = 'cache+memcached://127.0.0.1:11211/'

使用多个 Memcached 服务器:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

“memory” 后端只在内存中存储缓存:

result_backend = 'cache'
cache_backend = 'memory'
  • cache_backend_options
    默认值:{} (空映射)
    你可使用 cache_backend_options 设置 pylibmc 选项:
cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}
  • cache_backend
    这个设置再也不使用了,由于如今能够直接在 result_backend 中设置后端存储。

Redis 后端设置

配置后端 URL

注意:
  Redis 后端须要 Redis 库。

可使用 pip 安装这个包:

$ pip install celery[redis]

查看 Bundles 获取组合多个扩展需求的信息

后端须要 result_backend 设置成一个 Redis URL:

result_backend = 'redis://:password@host:port/db'

例如:

result_backend = 'redis://localhost/0'

等同于:

result_backend = 'redis://'

URL 的字段以下定义:
1. password
  链接数据库的密码
2. host
  Redis 服务器的主机名或者IP地址(例如:localhost)
3. port
  Redis 服务器的端口。默认是 6379
4. db
  使用的数据库编号。默认是0。db 能够包含一个可选的斜杠

  • redis_backend_us_ssl
    默认值:禁用
    Redis后端支持 SSL。这个选项的合法值与 broker_use_ssl 相同

  • redis_max_connections
    默认值:无显示
    Redis 链接池的最大可用链接数,这些链接用来发送和接收结果

  • redis_socket_connect_timeout
    5.0.1版本新特性
    默认值:None

从存储后端链接到Redis服务器的链接的Socket超时时间(以秒为单位,int/float)

  • redis_socket_timeout
    默认值:120秒
    对 Redis 服务器的读写操做的 Socket 超时时间(以秒为单位,int/float),由存储后端使用

Cassandra 后端设置

注意:
  Cassandra 后端驱动 cassandra-driver。

使用 pip 安装:

$ pip install celery[cassandra]

查看 Bundles 获取关于组合扩展需求的信息。

后端须要配置下列配置指令

  • cassandra_servers
    默认值: [] (空列表)
    Cassandra 服务器列表。例如:
cassandra_servers = ['localhost']
  • cassandra_port
    默认值:9042.
    链接到Cassandra服务器的端口

  • cassandra_keyspace
    默认值: None.
    存储结果的 key-space。例如:

cassandra_keyspace = 'tasks_keyspace'
  • cassandra_table
    默认值: None.
    存储结果的表(列族)。例如:
cassandra_table = 'tasks'
  • cassandra_read_consistency
    默认值: None.
    使用的读一致性。值能够是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_write_consistency
    默认值: None.
    使用的写一致性。值能够是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_entry_ttl
    默认值: None.
    状态项的 Time-to-live。添加事后一段时间他们将会过时而且被删除。值 None (默认) 意味着他们永不过时

  • cassandra_auth_provider
    默认值: None.
    使用的 cassandra.auth 模块中的 AuthProvider。 值能够是 PlainTextAuthProvider 或者 SaslAuthProvider

  • cassandra_auth_kwargs
    默认值: {} (空映射)
    传递给 authentication provider 的命名参数。例如:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}

配置示例:

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400

Elasticsearch 后端设置

使用 Elasticsearch 做为结果后端,你只须要将result_backend设置成正确的 URL。

配置示例:

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
  • elasticsearch_retry_on_timeout
    默认值: False
    超时后是否应该触发在另外一个节点重试?

  • elasticsearch_max_retries
    默认值: 3
    异常被传递前的最大重试次数

  • elasticsearch_timeout
    默认值: 10.0 秒
    elasticsearch 使用的全局超时时间

Riak 后端设置

注意:
Riak 后端须要 riak 库

使用 pip 进行安装:

$ pip install celery[riak]

查看 Bundles 获取组合多扩展需求的信息。

后端须要result_backend设置成一个 Riak URL:

result_backend = 'riak://host:port/bucket'

例如:

result_backend = 'riak://localhost/celery

等同于:

result_backend = 'riak://'

URL 的字段定义以下:
1. host
  Riak 服务器的主机名或者IP地址(例如 localhost)
2. port
  使用 protobuf 协议的Riak 服务器端口,默认是 8087
3. bucket
  使用的Bucket名称。默认是 celery。bucket 名称须要是一个只包含ASCII字符的字符串。

另外,这个后端可使用以下配置指令进行配置:

  • riak_backend_settings
    默认值: {} (空映射)
    这是一个支持以下键的映射:

    1. host
      Riak 服务器的主机名或者IP地址(例如 localhost)

    2. port
      Riak 服务器端口。默认是 8087

    3. bucket
      使用的Bucket名称。默认是 celery。bucket 名称须要是一个只包含ASCII字符的字符串。

    4. protocol
      链接到 Riak 服务器使用的协议。这不能够经过 result_backend 配置

AWS DynamoDB 后端设置

注意:
  Dynamodb 后端须要 boto3 库

使用 pip 进行安装:

$ pip install celery[dynamodb]

查看 Bundles 获取组合多扩展需求的信息。

后端须要 result_backend 设置成一个 DynamoDB URL:

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'

例如,声明 AWS 区域以及表名称:

result_backend = 'dynamodb://@us-east-1/celery_results

或者从环境中获取 AWS 配置参数,使用默认表名称(celery)以及声明读写吞吐量:

result_backend = 'dynamodb://@/?read=5&write=5'

或者在本地使用 DynamoDB 的可下载版本:

result_backend = 'dynamodb://@localhost:8000

URL 中的字段以下定义:

  1. aws_access_key_id & aws_secret_access_key
    访问 AWS API 资源的认证信息。这能够经过 boto3 从不一样的源获取到

  2. region
    AWS 区域,例如: us-east-1 或者本地版本的 localhost。查看 boto3 库文档获取更多的信息。

  3. port
    若是你使用的本地版本,这是本地DynamoDB示例监听的端口。若是你没有把区域设置成 localhost,这个设置选项将无效

  4. table
    使用的表名。默认是 celery。查看 DynamoDB 命名规则获取容许的字符以及表名长度的信息。

  5. read & write
    所建立的 DynamoBD 表的读写能力单元。默认的读写值都是 1。更多的细节能够从 Provisioned Throughput documentation 中获取到。

IronCache 后端设置

注意:
IronCache 后端须要 iron_celery 库:

使用 pip 进行安装:

$ pip install iron_celery

IronCache 经过在 result_backend 中配置的 URL 进行声明,例如:

result_backend = 'ironcache://project_id:token@'

或者更改缓存名称:

ironcache:://project_id:token@/awesomecache

更多的信息,查看 https://github.com/iron-io/iron_celery

Couchbase 后端设置

注意:
Couchbase 后端须要 couchbase 库

使用 pip 进行安装:

$ pip install celery[couchbase]

查看 Bundle 获取组合多扩展需求的步骤。

后端能够经过 result_backend 设置成一个 Couchbase URL:

result_backend = 'couchbase://username:password@host:port/bucket'
  • couchbase_backend_settings

   默认值:{} (空映射)

  这是一个支持以下键的映射:

  1. host
    Couchbase 服务器的主机名。默认是 localhost
  2. port
    Couchbase 服务器监听的端口。默认是 8091
  3. bucket
    Couchbase 服务器默认写入的桶。默认是default
  4. username
    Couchbase 服务器认证的用户名(可选)
  5. password
    Couchbase 服务器认证的密码(可选)

CouchDB 后端设置

注意:
CouchDB 后端须要 pycouchdb 库:
使用 pip 安装这个包:

$ pip install celery[couchdb]

查看 Bundles 获取更多关于组合多扩展需求的信息

后端能够经过 result_backend 配置成一个 CouchDB URL:

result_backend = 'couchdb://username:password@host:port/container'

URL 由如下部分组成:

  1. username
    Couchbase 服务器认证的用户名(可选)
  2. password
    Couchbase 服务器认证的密码(可选)
  3. host
    Couchbase 服务器的主机名。默认是 localhost
  4. port
    Couchbase 服务器监听的端口。默认是 8091
  5. container
    CouchDB 服务器写入的默认容器。默认是 default

File-system 后端设置

后端能够经过一个文件 URL 配置,例如:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

配置的目录须要被共享,而且全部使用该后端的服务器均可写。

若是你在单独的一个系统上使用 Celery,你不须要任何进一步的配置就能够简单的使用这个后端。对于大型的集群,你可使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其余文件系统。

Consul K/V 存储后端设置

Consul 后端能够经过 URL 配置:

CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’

后端将在 Consul K/V 存储中做为单独键存储结果

后端使用Consul 中的 TTLs 支持结果的自动过时

消息路由

  • task_queues
    默认值: None (默认队列的配置)
    多数用户不肯声明这个配置,而是使用 automatic routing facilites。

若是你真的须要配置高级路由,这个设置应该是一个 kombu.Queue 对象的列表,工做单元能够从中消费。

注意工做单元能够经过 -Q 选项覆盖这个设置,或者这个列表中的单独队列能够经过 -X 选项进行排除。

查看 Basics 获取更多的信息。

默认值是 celery 队列的一个队列/消息交换器/绑定的键,消息交互类型是direct。

查看 task_routes

  • task_routes
    默认值: None
    一个路由器的列表,或者单个路路由,用来路由任务到相应的队列。当决定一个任务的最终目的,路由器将按声明顺序进行轮询。

一个路由器能够经过以下方式声明:

  1. 函数,签名格式为 (name, args, kwargs, options, task=None, **kwargs)
  2. 字符串,提供到路由函数的路径
  3. 字典,包含路由声明,它将会转化成一个 celery.routes.MapRoute 实例
  4. 一个 (pattern, route) 元组的列表,它将会转化成一个 celery.routes.MapRoute 实例
    示例:
task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media'
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
其中,myapp.tasks.route_task 能够是:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}

route_task 能够返回一个字符串或者一个字典。一个字符串表示 task_queues 中的一个队列名,而字典表示一个自定义的路由。

当发送消息,路由被按顺序询问。第一个返回非 None 值得路由将被使用。消息选项此时将与找到的路由设置合并,其中路由器的设置要优先。

例如: apply_async() 有这些参数:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

而且有一个路由器返回:

{'immediate': True, 'exchange': 'urgent'}

那么最终的消息选项将是:

immediate=True, exchange='urgent', routing_key='video.compress'

(以及Task类中定义的任意默认消息选项)

当进行合并时,task_routes 中定义的值会优先于 task_queues 中定义的值。

对于以下设置:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

tasks.add 的最终路由选项将变为:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

查看路由器获取更多的示例。

  • task_queue_ha_policy
    消息中间件: RabbitMQ
    默认值:None
    这将设置一个队列的HA策略,而且值能够是一个字符串(一般是 all)
task_queue_ha_policy = 'all'

使用 all 将复制队列到全部的当前节点,或者你指定一个节点的列表:

task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']

使用一个列表将隐示设置 x-ha-policy为‘nodes,x-ha-policy-params` 为给定的节点列表

查看 http://www.rabbitmq.com/ha.html 获取更多的信息

  • task_queue_max_priority
    消息中间件: RabbitMQ
    默认值: None
    查看 RabbitMQ Message Priorities

  • worker_direct
    默认值: 禁用

这个选项使得每一个工做单元又一个专门的队列,因此任务能够路由到指定的工做单元。

每一个工做单元的队列名称是基于工做单元主机名和一个 .dq后缀自动产生的,使用 C.dq 消息交互器。

例如:节点名称为 w1@example.com 的工做单元的队列名称为:

w1@example.com.dq

此时,你能够经过指定主机名为路由键而且使用 C.dq 消息交互器来将任务路由到指定的节点。

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}
  • task_create_missing_queues
    默认值:启用
    若是启用(默认),任何声明的未在 task_queues 中未定义的队列都将自动被建立。查看 Automaci routing。

  • task_default_queue
    默认值: celery
    若是消息没有声明路由或者自定义的队列,apply_async 默认使用的队列名称。

这个队列必须在 task_queues 中。若是 task_queues 没有声明,那么他将自动建立一个队列项,而这个设置值就做为队列的名称。

另见:
修改默认队列的名称

  • task_default_exchange
    默认值:”celery”
    当 task_queues 设置中指定键没有声明自定义的消息交互器,那么这个默认的消息交互器将被使用。

  • task_default_exchange_type
    默认值:”direct”
    当 task_queues 设置中指定键没有声明自定义的消息交互器类型,那么这个默认的消息交互器类型将被使用。

  • task_default_routing_key
    默认值:”celery”
    当 task_queues 设置中指定键没有声明自定义的路由键,那么这个默认的路由键将被使用。

  • task_default_delivery_mode
    默认值:”presistent”

  能够是瞬态的(消息不写硬盘),或者持久的(写硬盘)

消息中间件设置

  • broker_url
    默认值:”amqp://”
    默认的消息中间件URL。这必须是一个以下形式的URL:
transport://userid:password@hostname:port/virtual_host

其中只有模式部分是必须的,其他部分都是可选的,默认会设置为对应传输中间件的默认值。

传输部分是使用的消息中间件的实现,默认是 amqp,(若是安装了librabbitmq会使用这个库,不然使用pyamqp)。还有其余可用的选择,包括 redis://、 sqs://、 qpid://。

模式部分能够是你本身的传输中间件实现的全限定路径:

broker_url = 'proj.transports.MyTransport://localhost'

能够配置多个消息中间件,使用相同的传输协议也行。消息中间件能够经过当个字符串声明,不一样的消息中间件URL之间用冒号分隔:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

或者做为一个列表:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

这些消息中间件将被用于broker_failover_strategy

查看Kombu 文档中的 URLs 章节获取更多的信息。

  • broker_read_url / broker_write_url
    默认值:broker_url的设置值
    这些设置能够配置而不用 broker_url 的设置,能够为消息中间件声明不一样的链接参数,用来消费和生成消息。

示例:

broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'

全部选项均可以声明成一个列表,做为故障恢复的可选值,查看 broker_url 获取更多的信息

  • broker_failover_strategy
    默认值:“round-robin”
    消息中间件链接对象的默认故障恢复策略。若是提供了,将映射到 kombu.connection.failover_strategies 中的一个键,或者引用任何方法,从给定的列表中产生一个项。

示例:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy
  • broker_heartbeat
    支持的传输层协议:pyamqp
    默认值:120.0(与服务器协商)

  注意:这个值只被工做单元使用,客户端此时不使用心跳。

  由于单纯使用 TCP/IP 并不老是及时探测到链接丢失,因此 AMQP 定义了心跳,客户端和消息中间件用来检测链接是否关闭。

  心跳会被监控,若是心跳值是 10 秒,那么检测心跳的时间间隔是 10 除以broker_heartbeat_checkrate (默认状况下,这个值是心跳值的两倍,因此对于10秒心跳,心跳每隔5秒检测一次)

  • broker_heartbeat_checkrate
    支持的传输层协议:pyamqp
    默认值:2.0

工做单元会间隔监控消息中间件没有丢失过多的心跳。这个检测的速率是用 broker_heartbeat 值除以这个设置值获得的,因此若是心跳是 10.0 而且这个设置值是默认的2.0,那么这个监控将每隔5秒钟执行一次(心跳发送速率的两倍)

  • broker_use_ssl
    支持的传输层协议: pyamqp, redis
    默认值: 禁用

在消息中间件链接上使用SSL

这个选项的合法值依据使用的传输协议的不一样而不一样

  • pyamqp
    若是设置成True,链接将依据默认的SSL设置启用SSL。若是设置成一个字典,将依据给定的策略配置SSL链接。使用的格式是 python 的 ssl.wrap_socket() 选项。

注意SSL套接字通常会在消息中间件的一个单独的端口上服务。

如下示例提供了客户端证书,而且使用一个自定义的认证受权来验证服务器证书:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

告警:
  使用 broker_use_ssl=True 时请当心。可能你的默认配置根本不会验证服务器证书。请阅读python的 ssl module security considerations。

  • redis
    设置必须是一个字典,包括以下键:
ssl_cert_reqs (required): one of the SSLContext.verify_mode values: 
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
ssl_ca_certs (optional): path to the CA certificate
ssl_certfile (optional): path to the client certificate
ssl_keyfile (optional): path to the client key
  • broker_pool_limit
    2.3版本新特性

    默认值:10

    链接池中能够打开最大链接数。

    从2.5版本开始链接池被默认启用,默认限制是10个链接。这个数值能够依据使用一个链接的 threads/green-threads (eventlet/gevent) 数量进行更改。例如:运行 eventlet 启动 1000 个 greenlets,他们使用一个链接到消息中间件,若是发生竞态条件,那么你应该开始增长这个限制。

    若是设置成None或者0,链接池将会被禁用,而且每次使用链接都会从新创建链接并关闭。

  • broker_connection_timeout
    默认值:4.0
    放弃与AMQP服务器创建链接以前默认等待的超时时间。当使用 gevent 时该设置被禁用。

  • broker_connection_retry
    默认值:启用
    若是与 AMQP 消息中间件的链接断开,将自动从新创建链接

   每次重试中间等待的时间会递增,而且在 broker_connection_max_retries 未达到以前会一只重试

  • broker_connection_max_retries
    默认值:100
    放弃与 AMQP 服务器从新创建链接以前的最大重试次数

   若是设置成 0 或者 None,将一直重试

  • broker_login_method
    默认值:AMQPLAIN
    设置自定义的 amqp 登录方法

  • broker_transport_options
    2.2 版本新特性
    默认值:{} (空映射)

   传递给底层传输中间件的一个附加选项的字典

   设置可见超时时间的示例以下(Redis 与 SQS 传输中间件支持):

  broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours 


工做单元

  • imports
    默认值:[] (空列表)
    当工做单元启动时导入的一系列模块

   这用来声明要导入的模块,可是它还可用来导入信号处理函数和附加的远程控制命令,等等。

   这些模块将会以原来声明的顺序导入

  • include
    默认值:[] (空列表)
    语义上与 imports 相同,可是能够做为将不一样导入分类的一种手段

这个设置中的模块是在 imports 设置中的模块导入以后才导入

  • worker_concurrency
    默认值:CPU核数
    执行任务的并发工做单元 process/threads/green 数量

   若是你大部分操做是I/O操做,你能够设置更多的进程(线程),可是大部分状况下都是以CPU数做为定界,尝试让这个值接近你机器的CPU核数。若是没有设置,当前机器的 CPU核数将会被使用

  • worker_prefetch_multiplier
    默认值:4
    工做单元一次预获取多少个消息是这个设置值乘以并发进程的数量。默认值是 4(每一个进程4条消息)。可是,默认设置一般是好的选择 - 若是你有长时间任务等待在队列中,而且你必须启动工做单元,注意第一个工做单元初始时将收到4倍的消息量。所以任务可能在工做单元间不会平均分布

   禁用这个选项,只要将 worker_prefetch_multiplier 设置成 1设置成 0 将容许工做单元持续消费它想要的尽量多的消息

   更详细的信息,请阅读 Prefetch Limits

   注意:
    带 ETA/countdown 的任务不会受 prefetch 限制的影响

  • worker_lost_wait
    默认值:10.0 秒
    有些状况下,工做单元可能在没有适当清理的状况下就被杀死,而且工做单元可能在终止前已经发布了一个结果。这个值声明了在抛出 WorkerLostError 异常以前咱们会在丢失的结果值上等待多久

  • worker_max_tasks_per_child
    一个工做单元进程在被一个新的进程替代以前能够执行的最大任务数

  • worker_max_memory_per_child

    默认值:没有限制。类型:int(kilobytes)

    一个工做单元进程在被一个新的进程替代以前能够消耗的最大预留内存(单位KB)。若是单独一个任务就致使工做单元超过这个限制,当前的任务会执行完成,而且以后这个进程将会被更新替代。

    示例:

  worker_max_memory_per_child = 12000  # 12MB
  • worker_disable_rate_limits
    默认值:禁用(启用速率限制)
    即便任务显示设置了速率,仍然禁用全部速率限制

  • worker_state_db
    默认值:None
    存储工做单元状态的文件名称(如取消的任务)。能够是相对或者绝对路径,可是注意后缀.db 可能会被添加到文件名后(依赖于python 的版本)

   也能够经过celery worker –statedb 参数设置

  • worker_timer_precision
    默认值:1.0秒
    设置从新检测调度器以前ETA调度器能够休息的最大秒数

   设置成1意味着调度器精度将为1秒。若是你须要毫秒精度,你能够设置成 0.1

  • worker_enable_remote_control
    默认值:默认启用
    声明工做单元的远程控制是否启用

事件

  • worker_send_task_events
    默认值:默认禁用
    发送任务相关的事件,使得任务可使用相似flower 的工做监控到。为工做单元的 -E 参数设置默认值

  • task_send_sent_event
    2.2 版本新特性
    默认值:默认禁用

   若是启用,对于每一个任务都将有一个 task-sent 事件被发送,所以任务在被消费前就能被追踪。

  • event_queue_ttl
    支持的传输中间件: amqp
    默认值:5.0 秒
    发送到一个监控客户端事件队列的消息的过时时间(x-message-ttl),以秒为单位(int/float)。

   例如:若是这个值设置为10,被递送到这个队列的消息将会在10秒后被删除

  • event_queue_expires
    支持的传输中间件: amqp
    默认值:60.0 秒
    一个监控客户端事件队列被删除前的过时时间(x-expires)。

  • event_queue_prefix
    默认值: “celeryev”.
    事件接收队列名称的前缀

  • event_serializer
    默认值: “json”.
    当发送事件消息时使用的消息序列化格式

远程控制命令

  • control_queue_ttl
    默认值: 300.0

  • control_queue_expires
    默认值: 10.0

日志

  • worker_hijack_root_logger
    2.2 版本新特性
    默认值: 默认启用 (hijack root logger).

   默认状况下,任意前面配置的根日志器的处理函数都将被移除。若是你想自定义日志处理函数,那么你能够经过设置 worker_hijack_root_logger = False 来禁用这个行为。

   注意:
     日志能够经过链接到 celery.signals.setup_logging 进行定制化

  • worker_log_color
    默认值: 若是应用实例日志输出到一个终端,这个将启用
    启用/禁用Celery 应用日志输出的颜色

  • worker_log_format
    默认值:

    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s  日志信息的格式
    

  查看python 日志模块获取更多关于日志的信息

  • worker_task_log_format
    默认值:
[%(asctime)s: %(levelname)s/%(processName)s]
    [%(task_name)s(%(task_id)s)] %(message)s

任务中记录日志使用的格式。查看python 日志模块获取更多关于日志的信息

  • worker_redirect_stdouts
    默认值: 默认启用
    若是启用来,标准输出和标准错误输出将重定向到当前日志器

工做单元和 beat 将使用到

  • worker_redirect_stdouts_level
    默认值:WARNING
    标准输出和标准错误输出的日志级别。能够是DEBUG, INFO, WARNING, ERROR, or CRITICAL

安全

  • security_key
    默认值: None.
    2.5 版本新特性

包含私钥的文件的相对或者绝对路径,私钥用来在使用消息签名时对消息进行签名。

  • security_certificate
    默认值:None.
    2.5 版本新特性

  包含X.509认证的文件的相对或者绝对路径,认证用来在使用消息签名时对消息进行签名。

  • security_cert_store
    默认值:None.
    2.5 版本新特性

  包含用来进行消息签名的X.509认证的目录。可使用文件名模式匹配(例如:/etc/certs/*.pem)

自定义组件类 (高级)

  • worker_pool
    默认值:”prefork” (celery.concurrency.prefork:TaskPool).
    工做单元使用的池类的名称

  • Eventlet/Gevent
    永远不要使用这个选项来选择用eventlet 仍是 gevent。你必须对工做单元使用-P选项,确保应急补丁不会应用过迟,致使出现奇怪的现象。

  • worker_pool_restarts
    默认值:默认禁用

   若是启用,工做单元池可使用 pool_restart 远程控制命令进行重启

  • worker_autoscaler
    2.2 版本新特性
    默认值: “celery.worker.autoscale:Autoscaler”.

使用的自动扩展类的名称

  • worker_consumer
    默认值:”celery.worker.consumer:Consumer”.
    工做单元使用的消费类的名称

  • worker_timer
    默认值:”kombu.async.hub.timer:Timer”.
    工做单元使用的 ETA 调度器类的名称。默认值是被池具体实现设置。

Beat 设置 (celery beat)

  • beat_schedule
    默认值: {} (空映射)
    beat调度的周期性任务。查看Entries

  • beat_scheduler
    默认值:”celery.beat:PersistentScheduler”.
    默认的调度器类。若是同时使用django-celery-beat扩展,能够设置成 “django_celery_beat.schedulers:DatabaseScheduler”

也能够经过celery beat 的 -S 参数进行设置

  • beat_schedule_filename
    默认值: “celerybeat-schedule”.
    存储周期性任务最后运行时间的文件的名称,这个文件被PersistentScheduler使用。能够是相对或者绝对路径,可是注意后缀.db可能添加到文件名后(依赖于python版本)

   也能够经过 celery beat 的 –schedule 参数进行设置

  beat_sync_every
  默认值:0.
  另外一个数据库同步发起前能够执行的周期性任务的数量。值0(默认)表示基于时间同步 - 默认是3分钟,由scheduler.sync_every肯定。若是设置成1,beat将在每一个任务消息发送后发起同步。

  beat_max_loop_interval
   默认值: 0.

 

转自:https://blog.csdn.net/libing_thinking/article/details/78812472

相关文章
相关标签/搜索