Celery-4.1 用户指南: Application(应用)

Application


Celery 库在使用以前必须初始化,一个celery实例被称为一个应用(或者缩写 app)。python

Celery 应用是线程安全的,因此多个不一样配置、不一样组件、不一样任务的 应用能够在一个进程空间里共存。正则表达式

下面建立一个 celery 应用:安全

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行显示的是 celery 应用的文本表示: 包含应用类的名称(Celery),当前主模块的名称(main),以及应用对象的内存地址(0x100469fd0)。app

Main Name


上述文本表示中只有一部分是重要的,那就是主模块名称。下面分析下它为什么重要。async

当你发送一个消息给 Celery,消息中不会包含任何源码,而只有你想要执行的任务的名称。这就好像因特网上的域名映射原理通常:每一个执行单元维护着一个任务名称到实际任务函数的映射,这个映射被称为任务注册表。函数

当你定义一个任务,这个任务就会被添加到本地注册表:工具

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

如今,你又看到显示 __main__ 模块名称;当Celery不能探查到这个任务函数属于哪一个模块时,它将使用主模块名称来产生任务名称的前缀。lua

这在有些状况下会产生问题:
1. 定义任务的主模块做为一个程序运行。
2. 应用在python交互终端建立。spa

例以下面程序,定义任务的模块还调用 app.worker_main() 来启动一个工做单元:.net

tasks.py:

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

当这个模块运行,任务将之前缀 __main__ 命名,可是当该模块被其余进程引入来运行一个任务,这个任务的名称将之前缀 tasks 命名(即这个模块的真实名称)。

>>> from tasks import add
>>> add.name
tasks.add

你能够给主模块声明另一个名称:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

具体能够查看名称这节。

配置


你能够设置一些选项来改变 Celery 的工做方式。这些选项能够直接在 app 实例上进行设置,或者也可使用一个指定的配置模块。

配置使用 app.conf 变量保存:

>>> app.conf.timezone
'Europe/London'

你能够直接设置配置值:

>>> app.conf.enable_utc = True

或者使用 update 方法同时更新多个配置项。

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

实际的配置对象由多个字典决定,配置项按如下顺序查询:
1. 运行时的配置修改
2. 配置模块(若是声明)
3. 默认配置(celery.app.defaults)

你还可使用 app.add_defaults() 方法添加新的默认配置源。

另外: 全部可用配置的完整列表及其默认值请参照 Configuration reference。

config_from_object


app.config_from_object() 方法从一个配置对象加载配置。

它能够是一个配置模块,或者任意包含配置属性的对象。

注意任何前面设置的配置在调用 config_from_object 方法后都将被重置。若是你想设置附加的配置应该在调用这个方法以后。

示例1: 使用模块名
app.config_from_object() 方法的参数能够是一个 python 模块的全限定名称,或者一个 python 属性名,例如:”celeryconfig”,”myproj.config.celery”,或者 “myproj.config:CeleryConfig”:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfig 模块内容以下形式:
celeryconfig.py

enable_utc = True
timezone = 'Europe/London'

只要 import celeryconfig 能正常运行,应用实例就能加载它。

示例2:传递一个模块对象
你还能够传递一个已经加载的模块对象,可是不做为常规建议。


提示:
建议使用模块名的方式加载,由于这种状况下当prefork池使用时,配置模块没必要序列化。若是遇到配置问题或者序列化错误,能够尝试使用模块名的方式加载配置。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

示例3:使用配置类/对象

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

config_from_envvar


app.config_from_envvar() 从环境变量中获取配置模块名称。

例如,从环境变量 CELERY_CONFIG_MODULE 所声明的模块加载配置:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

你能够经过环境变量声明要使用的模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

敏感配置


若是你想打印配置信息,做为调试信息或者相似,你也许不想暴露密码和API秘钥这类信息。

Celery 提供了一些有用的工具函数来展现这些配置信息,其中一个是 humanize() 函数:

>>> app.conf.humanize(with_defaults=False, censored=True)

请注意Celery不会移除全部的敏感信息,由于它只是仅仅使用一个正则表达式来匹配配置项键名。若是你添加包含敏感信息的定制化配置,你应该使用 celery 能识别为敏感信息的键名。

若是一个配置项键名包含如下字符串,它将被看做是敏感的: API,TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE

延迟加载

一个应用实例是延迟加载的,意味着它只有在实际调用的时候才会被求值。
建立一个celery 实例只会作以下事情:
1. 建立一个用于事件的逻辑时钟实例
2. 建立一个任务注册表
3. 将本身设置为当前应用实例(若是 set_as_current 参数被禁用将不会作此设置)
4. 调用 app.on_init() 回调函数(默认不作任何事情)

app.task() 装饰器在任务定义时不会建立任务,而是延迟任务的建立到任务使用时,或者应用被终止时。

下面这个例子说明了直到你使用任务时或者访问任务对象的属性时(这里是 repr())任务才会被建立:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

应用的终止有两种状况,显示调用 app.finalize() 终止,或者经过访问 app.tasks 属性隐示终止。

终止应用对象将会执行:
1. 应用间必须共享的任务的拷贝
任务默认是被共享的,可是若是任务装饰器的共享参数被设置为禁用时任务会为被绑定的应用所私有。

  1. 对全部未求值的任务求值

  2. 确认全部任务都绑定到当前应用实例
    任务绑定到了应用实例,因此能够读取配置的默认值。

note:
“默认应用实例”
celery 并非一开始有应用实例这个概念,最先只有一个模块级别的API,为了向后兼容老的API,这个模块级别API会保留直到celery 5.0发布。

celery 会建立一个特殊的应用实例 - 默认应用实例,若是没有自定义的应用实例被初始化,这个默认应用实例将会被启用。

例如,老的任务基类使用了许多兼容特性,其中一些与新的特性不兼容,好比任务方法。

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

即便你使用老的模块级别的API,也推荐使用新的基类。

打破链式操做


虽然能够依赖于当前设置的应用实例,可是将应用实例做为参数传递给全部须要它的对象仍然是最佳操做实践。

称这种操做为“应用实例链”的缘由是由于它依赖所传递的应用实例建立了一个链。

下面这个例子被认为是差的实践:

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app

它应用将 app 做为一个参数传递:

class Scheduler(object):

    def __init__(self, app):
        self.app = app

在celery内部实现中,使用 celery.app_or_default() 函数使得模块级别的 API 也能正常使用。

from celery.app import app_or_default

class Scheduler(object):
    def __init__(self, app=None):
        self.app = app_or_default(app)

在开发环境中,能够经过设置 CELERY_TRACE_APP 环境变量在应用实例链被打破时抛出一个异常:

$ CELERY_TRACE_APP=1 celery worker -l info

note:
API 的演化
celery 项目从开始建立到如今的七年多时间里已经改变了不少。
例如,最开始可使用任何一个可调用对象做为一个任务:

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!',))

能够建立一个任务类,设置特定属性,或者覆盖其余行为

from celery.task import Task
from celery.registry import tasks

class Hello(Task):
    queue = 'hipri'

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

后来,开发者以为传递任意可调用对象是反模式,由于它使得很难使用除了 pickle 以外的序列化方案,所以这个特性在 2.0 就被踢除了,取而代之的是任务装饰器:

from celery.task import task

@task(queue='hipri')
def hello(to):
    return 'hello {0}'.format(to)

抽象任务


全部使用 task() 装饰器建立的任务都会继承应用的基础 Task 类。

你可使用装饰器的 base 参数给任务声明一个不一样的基类:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

建立一个自定义的任务类,你应该继承这个中性类:celery.Task

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

提示:
若是你覆盖了任务的 __call__ 方法,那么很是重要的一点是你还须要调用父类的方法使得在任务被直接调用时基类call方法能设置好默认请求。

这个中性类比较特殊,由于它不会绑定到任意特殊应用实例。一旦任务绑定到一个应用实例,它将读取应用的配置信息来设置默认值等等。

使用一个基类,你须要使用 app.task() 装饰器建立一个任务:

@app.task(base=DebugTask)
def add(x, y):
    return x + y

还能够经过修改 app.Task 属性来修改一个应用实例的默认基类:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]

 转自:https://blog.csdn.net/libing_thinking/article/details/78541171

相关文章
相关标签/搜索