Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专一于实时处理的异步任务队列,同时也支持任务调度。本文目的是看看 Celery 的 task 到底是什么,以及 若是咱们想从无到有实现一个 task 机制,有哪些地方须要注意,应该如何处理。html
由于 task 和 Consumer 消费密切相关,为了更好的说明,故本文与上文有部分重复,请谅解。java
咱们能够大体想一想须要一些问题,也就是咱们下面剖析的出发点和留意点。node
咱们在下面会逐一回答这些问题。python
示例代码服务端以下,这里使用了装饰器来包装待执行任务。redis
Task就是用户自定义的业务代码,这里的 task 就是一个加法功能。json
from celery import Celery app = Celery('myTest', broker='redis://localhost:6379') @app.task def add(x,y): print(x+y) return x+y if __name__ == '__main__': app.worker_main(argv=['worker'])
发送代码以下:canvas
from myTest import add re = add.apply_async((2,17))
为了了解 task 是什么,咱们首先打印出运行变量看看,这里选取了主要成员变量:后端
self = {add} <@task: myTest.add of myTest at 0x7faf35f0a208> Request = {str} 'celery.worker.request:Request' Strategy = {str} 'celery.worker.strategy:default' app = {Celery} <Celery myTest at 0x7faf35f0a208> backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7faf364aea20> from_config = {tuple: 9} (('serializer', 'task_serializer'), ('rate_limit', 'task_default_rate_limit'), ('priority', 'task_default_priority'), ('track_started', 'task_track_started'), ('acks_late', 'task_acks_late'), ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'), ('reject_on_worker_lost', 'task_reject_on_worker_lost'), ('ignore_result', 'task_ignore_result'), ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored')) name = {str} 'myTest.add' priority = {NoneType} None request = {Context} <Context: {}> request_stack = {_LocalStack: 0} <celery.utils.threads._LocalStack object at 0x7faf36405e48> serializer = {str} 'json'
能够看出来,'myTest.add' 是一个Task变量。app
因而咱们须要看看Task 是什么。Task 的实如今 Celery 中你会发现有两处,负载均衡
一处位于 celery/app/task.py;
第二个位于 celery/task/base.py 中;
他们之间是有关系的,你能够认为第一个是对外暴露的接口,而第二个是具体的实现。
任务是 Celery 里不可缺乏的一部分,它能够是任何可调用对象。每个任务经过一个惟一的名称进行标识, worker 经过这个名称对任务进行检索。任务能够经过 app.task 装饰器进行注册,须要注意的一点是,当函数有多个装饰器时,为了保证 Celery 的正常运行,app.task 装饰器须要在最外层。
Task 承载的功能就是在 Celery 应用中,启动对应的消息消费者。
任务最基本的形式就是函数,任务发布最直接的想法就是client将要执行的相关函数代码打包,发布到broker。分布式计算框架 spark 就是使用这种方式(Spark的思想比较简单:挪计算不挪数据)。2.0以前的celery也支持这种任务发布的方式。
这种方式显而易见的一个坏处是传递给broker的数据量可能会比较大。解决的办法也很容易想到,就是把要发布的任务相关的代码,提早告诉worker。这就是 全局集合 和 注解注册的做用。
当采用 "提早告诉 worker 咱们自定义的 task" 时候,定义 task 的方法以下:
@app.task(name='hello_task') def hello(): print('hello')
其中的app是worker中的application,经过装饰器的方式,对任务函数注册。
app会维护一个字典,key是任务的名字,也就是这里的hello_task
,value是这个函数的内存地址。任务名必须惟一,可是任务名这个参数不是必须的,若是没有给这个参数,celery会自动根据包的路径和函数名生成一个任务名。
经过上面这种方式,client发布任务只须要提供任务名以及相关参数,没必要提供任务相关代码:
# client端 app.send_task('hello_task')
这里须要注意:client发布任务后,任务会以一个消息的形式写入broker队列,带有任务名称等相关参数,等待worker获取。这里任务的发布,是彻底独立于worker端的,即便worker没有启动,消息也会被写入队列。
这种方式也有显而易见的坏处,全部要执行的任务代码都须要提早在worker端注册好,client端和worker端的耦合变强了。
所以,咱们须要从 Celery 应用启动时候开始看。
Celery 启动首先就是来到 celery/_state.py
这里创建了一个 全局 set,用来收集全部的 任务 tasks。
#: Global set of functions to call whenever a new app is finalized. #: Shared tasks, and built-in tasks are created by adding callbacks here. _on_app_finalizers = set()
在启动时候,系统经过调用以下函数来添加 任务。
def connect_on_app_finalize(callback): """Connect callback to be called when any app is finalized.""" _on_app_finalizers.add(callback) return callback
首先,celery/app/builtins.py 就定义了不少内置任务,须要一一添加到全局回调集合中。
@connect_on_app_finalize def add_map_task(app): from celery.canvas import signature @app.task(name='celery.map', shared=False, lazy=False) def xmap(task, it): task = signature(task, app=app).type return [task(item) for item in it] return xmap
其次,系统流程会来到咱们的自定义task,把这个 task 注册到全局回调集合中。
即,能够这么理解:Celery 启动以后,会查找代码中,哪些类或者函数使用了 @task注解,而后就把这些 类或者函数注册到全局回调集合中。
@app.task def add(x,y): print(x+y) return x+y
咱们顺着 @app.task 来到了 Celery 应用自己。
代码位于:celery/app/base.py。
@app.task 的做用是返回 _create_task_cls
来构建一个task proxy,而后加入 应用待处理队列 pending,而且利用connect_on_app_finalize(cons) 加入全局回调集合。
_create_task_cls = {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls at 0x7ff1a7b118c8>
具体代码以下:
def task(self, *args, **opts): if USING_EXECV and opts.get('lazy', True): from . import shared_task return shared_task(*args, lazy=False, **opts) def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): _filt = filter def _create_task_cls(fun): if shared: def cons(app): return app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ connect_on_app_finalize(cons) # 这里是重点,加入全局回调集合 if not lazy or self.finalized: ret = self._task_from_fun(fun, **opts) else: # return a proxy object that evaluates on first use ret = PromiseProxy(self._task_from_fun, (fun,), opts, __doc__=fun.__doc__) self._pending.append(ret) # 加入应用pending if _filt: return _filt(ret) return ret return _create_task_cls if len(args) == 1: if callable(args[0]): return inner_create_task_cls(**opts)(*args) return inner_create_task_cls(**opts)
按照示例中的调用,Celery 返回了Proxy的实例,传入参数就是task_by_cons。
此时查看一下Proxy类的实现,该类位于celery/local.py中。
class Proxy(object): """Proxy to another object.""" # Code stolen from werkzeug.local.Proxy. __slots__ = ('__local', '__args', '__kwargs', '__dict__') def __init__(self, local, args=None, kwargs=None, name=None, __doc__=None): object.__setattr__(self, '_Proxy__local', local) # 将传入参数local设置到_Proxy__local属性中 object.__setattr__(self, '_Proxy__args', args or ()) # 设置列表属性 object.__setattr__(self, '_Proxy__kwargs', kwargs or {}) # 设置键值属性 if name is not None: object.__setattr__(self, '__custom_name__', name) if __doc__ is not None: object.__setattr__(self, '__doc__', __doc__) ... def _get_current_object(self): """Get current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ loc = object.__getattribute__(self, '_Proxy__local') # 获取初始化传入的local if not hasattr(loc, '__release_local__'): # 若是没有__release_local__属性 return loc(*self.__args, **self.__kwargs) # 函数调用,将初始化的值传入调用该函数 try: # pragma: no cover # not sure what this is about return getattr(loc, self.__name__) # 获取当前__name__属性值 except AttributeError: # pragma: no cover raise RuntimeError('no object bound to {0.__name__}'.format(self)) ... def __getattr__(self, name): if name == '__members__': return dir(self._get_current_object()) return getattr(self._get_current_object(), name) # 获取obj的属性 def __setitem__(self, key, value): self._get_current_object()[key] = value # 设置key val def __delitem__(self, key): del self._get_current_object()[key] # 删除对应key def __setslice__(self, i, j, seq): self._get_current_object()[i:j] = seq # 列表操做 def __delslice__(self, i, j): del self._get_current_object()[i:j] def __setattr__(self, name, value): setattr(self._get_current_object(), name, value) # 设置属性 def __delattr__(self, name): delattr(self._get_current_object(), name) # 删除对应属性
咱们只展现了部分属性,分析如上,主要是根据传入的是否local是不是函数,或者包含release_local来判断是不是调用函数,或是获取属性来处理。
上面代码中,以下会把 task 添加到 Celery 应用的 pending queue。
self._pending.append(ret)
_pending定义以下,就是一个 deque:
class Celery: """Celery application. """ def __init__(self, main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs): self._pending = deque()
此时全局集合以下:
_on_app_finalizers = {set: 10} {function} <function add_chunk_task at 0x7fc200a81400> {function} <function add_backend_cleanup_task at 0x7fc200a81048> {function} <function add_starmap_task at 0x7fc200a81488> {function} <function add_group_task at 0x7fc200a812f0> {function} <function add_map_task at 0x7fc200a81510> {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons at 0x7fc200af4510> {function} <function add_accumulate_task at 0x7fc200aa0158> {function} <function add_chain_task at 0x7fc200a81378> {function} <function add_unlock_chord_task at 0x7fc200a81598> {function} <function add_chord_task at 0x7fc200aa01e0>
具体逻辑如图:
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------------------------------------------------------------------------------+
至此,得倒了一个 全局 set :_on_app_finalizers
,用来收集全部的 任务 tasks。
手机上如图:
目前 Celery 知道了有哪些 task,而且把它们收集起来,可是还不知道它们的逻辑意义。或者能够这么认为,Celery 只是知道有哪些类,可是没有这些类的实例。
由于消费 task 是 Celery 的核心功能,因此咱们不可避免的要再回顾下 Worker 的启动,可是这里咱们注重 worker 之中 与 task 相关的部分。
其实就是处理上面的 全局 set :_on_app_finalizers
。把这些暂时没有意义的 task 与 Celery 应用关联起来。
具体来讲,就是:
这里的Worker 就是 Celery 用来消费的 worker 实例。
因此,咱们直接来到 worker 看看。
代码位于:celery/bin/worker.py
@click.pass_context @handle_preload_options def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, statedb=None, **kwargs): """Start worker instance.""" app = ctx.obj.app worker = app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=node_format(pidfile, hostname), statedb=node_format(statedb, hostname), no_color=ctx.obj.no_color, **kwargs) # 运行到这里 worker.start() return worker.exitcode
在 worker = app.Worker
之中,咱们会发现,间接调用到了 WorkerController。
代码运行到这里,位于:celery/worker/worker.py。
这里作了一些初始化工做,咱们继续探究。
class WorkController: """Unmanaged worker instance.""" def __init__(self, app=None, hostname=None, **kwargs): self.app = app or self.app self.hostname = default_nodename(hostname) self.startup_time = datetime.utcnow() self.app.loader.init_worker() self.on_before_init(**kwargs) # 运行到这里
代码运行到这里,位于:celery/apps/worker.py
这里调用到了 trace.setup_worker_optimizations,这样立刻就看到 task 了。
class Worker(WorkController): """Worker as a program.""" def on_before_init(self, quiet=False, **kwargs): self.quiet = quiet trace.setup_worker_optimizations(self.app, self.hostname)
代码运行到这里,位于:celery/app/trace.py。
调用到 app.finalize(),目的是启动以前,搞定全部任务。
def setup_worker_optimizations(app, hostname=None): """Setup worker related optimizations.""" global trace_task_ret hostname = hostname or gethostname() # make sure custom Task.__call__ methods that calls super # won't mess up the request/task stack. _install_stack_protection() app.set_default() # evaluate all task classes by finalizing the app. app.finalize()
费了半天劲,咱们才来到了关键逻辑。
app.finalize() 会添加任务到 Celery 应用。
即:以前系统把全部的task都收集起来了,得倒了一个全局 set :_on_app_finalizers
。可是这个 set 中的task 目前没有逻辑意义,须要和 Celery 应用联系起来才行,因此这里就是要创建关联。
堆栈以下:
_task_from_fun, base.py:450 _create_task_cls, base.py:425 add_chunk_task, builtins.py:128 _announce_app_finalized, _state.py:52 finalize, base.py:511 setup_worker_optimizations, trace.py:643 on_before_init, worker.py:90 __init__, worker.py:95 worker, worker.py:326 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代码以下:
def finalize(self, auto=False): """Finalize the app. This loads built-in tasks, evaluates pending task decorators, reads configuration, etc. """ with self._finalize_mutex: if not self.finalized: if auto and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') self.finalized = True _announce_app_finalized(self) # 这里是关键,创建关联 pending = self._pending while pending: maybe_evaluate(pending.popleft()) for task in self._tasks.values(): task.bind(self) self.on_after_finalize.send(sender=self)
_announce_app_finalized(self)
函数是为了 : 把全局回调集合 _on_app_finalizers 中的回调函数运行,获得任务的实例,而后就把它们加入到 Celery 的任务列表,用户能够经过 task 名字获得对应的 task 实例。
def _announce_app_finalized(app): callbacks = set(_on_app_finalizers) for callback in callbacks: callback(app)
对于咱们的用户自定义任务,callback 就是 _create_task_cls,所以就是运行 _create_task_cls 进行添加。
def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): _filt = filter def _create_task_cls(fun): if shared: def cons(app): return app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ connect_on_app_finalize(cons) if not lazy or self.finalized: ret = self._task_from_fun(fun, **opts) # 这里
因而,在初始化过程当中,为每一个 app 添加该任务时,会调用到 app._task_from_fun(fun, **options)。
_task_from_fun
之中,使用以下代码把任务添加到 celery 之中。这样就关联起来。
self._tasks[task.name] = task
因而 self._tasks就为:
_tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x25da0ca0d88> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x25da0ca0d88> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x25da0ca0d88> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x25da0ca0d88> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x25da0ca0d88> 'celery.group' = {group} <@task: celery.group of myTest at 0x25da0ca0d88> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x25da0ca0d88> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x25da0ca0d88> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x25da0ca0d88> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x25da0ca0d88> __len__ = {int} 10
具体代码以下:
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): if not self.finalized and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') name = name or self.gen_task_name(fun.__name__, fun.__module__) base = base or self.Task if name not in self._tasks: run = fun if bind else staticmethod(fun) task = type(fun.__name__, (base,), dict({ 'app': self, 'name': name, 'run': run, '_decorated': True, '__doc__': fun.__doc__, '__module__': fun.__module__, '__annotations__': fun.__annotations__, '__header__': staticmethod(head_from_fun(fun, bound=bind)), '__wrapped__': run}, **options))() self._tasks[task.name] = task task.bind(self) # connects task to this app add_autoretry_behaviour(task, **options) else: task = self._tasks[name] return task
其中task在默认状况下是celery.app.task:Task,在动态生成该实例后,调用了task.bind(self)方法,这里就是设置 app 各类属性。
@classmethod def bind(cls, app): was_bound, cls.__bound__ = cls.__bound__, True cls._app = app # 设置类的_app属性 conf = app.conf # 获取app的配置信息 cls._exec_options = None # clear option cache if cls.typing is None: cls.typing = app.strict_typing for attr_name, config_name in cls.from_config: # 设置类中的默认值 if getattr(cls, attr_name, None) is None: # 若是获取该属性为空 setattr(cls, attr_name, conf[config_name]) # 使用app配置中的默认值 # decorate with annotations from config. if not was_bound: cls.annotate() from celery.utils.threads import LocalStack cls.request_stack = LocalStack() # 使用线程栈保存数据 # PeriodicTask uses this to add itself to the PeriodicTask schedule. cls.on_bound(app) return app
运行回到 Celery,此时代码位于:celery/app/base.py
变量以下:
pending = {deque: 1} deque([<@task: myTest.add of myTest at 0x7fd907623550>]) self = {Celery} <Celery myTest at 0x7fd907623550>
从pending 中提取任务以后,会进行处理。前面咱们提到,有一些 task 的待处理工做,就是在这里执行。
代码位于:celery/local.py
def __maybe_evaluate__(self): return self._get_current_object() def _get_current_object(self): try: return object.__getattribute__(self, '__thing')
此时self以下,就是任务自己:
self = {add} <@task: myTest.add of myTest at 0x7fa09ee1e320>
返回就是 myTest.add 任务自己。
目前已经获得了全部的 task,而且每个task都有本身的实例,能够进行调用。
由于任务消费须要用到多进程,因此咱们须要先大体看看多进程如何启动的。
让咱们继续看看 Celery Worker 的启动。
在 Celery Worker 启动过程当中,会启动不一样的bootsteps,在 Worker 启动过程当中,对应的 steps 为:[<step: Hub>, <step: Pool>, <step: Consumer>]。
start, bootsteps.py:116 start, worker.py:204 worker, worker.py:327 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代码位于:celery/bootsteps.py
def start(self, parent): self.state = RUN if self.on_start: self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): self.started = i + 1 step.start(parent)
变量为:
parent.steps = {list: 3} 0 = {Hub} <step: Hub> 1 = {Pool} <step: Pool> 2 = {Consumer} <step: Consumer> __len__ = {int} 3
具体 任务处理的逻辑 启动 就在 Pool 之中。
在 Pool(bootsteps.StartStopStep) 中,以下代码 w.process_task = w._process_task
给具体的 pool 配置了回调方法。 即 当 pool 接到通知,有运行机会时候,他知道用什么回调函数来获取/执行具体的task。
class Pool(bootsteps.StartStopStep): """Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency """ def create(self, w): procs = w.min_concurrency w.process_task = w._process_task # 这里配置回调函数
方法以下,能够预计,将来会经过 req.execute_using_pool(self.pool)
这里调用到 多进程 :
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
此时 变量为:
self = {Pool} <step: Pool> semaphore = {NoneType} None threaded = {bool} False w = {Worker} celery
最后获得以下逻辑,这个TaskRegistry 在执行任务会用到:
self._tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fb652da5fd0> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0> 'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0> __len__ = {int} 10
图例以下:
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------+-----------------------------------------------------------------------+ | | | +--------------------------------------------------------------------------------------------+ finalize v | | | TaskRegistry | +---------------------------+ | | | | | | | Celery | | | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | _process_task <-------------------+ process_task| | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | _tasks +-------------> | 'celery.group' = {group} <@task: celery.group of myTest> | | | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | +---------------------------+ | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手机以下:
或者咱们调整 图结构,从另外一个角度看看。
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | | | connect_on_app_finalize +------------+ | <----------------------------+ | builtins.py| | +------------+ | | connect_on_app_finalize | +-------------+ + | <---------------------------+ |User Function| | +-------------+ v +------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +--------------------------+---------------------------------------------------------------------+ | | finalize | | | v +-------------+-------------+ | | | Celery | | | | _tasks | | + | | | | +---------------------------+ | | | v +--------------------------------------------------------------------------------------------+ | | | TaskRegistry | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手机以下:
Task 定义的代码位于:celery/app/task.py。
从其成员变量能够清楚的看到大体功能分类以下:
基础信息,好比:
错误处理信息,好比:
业务控制,好比:
任务控制,好比:
具体定义以下:
@abstract.CallableTask.register class Task: __trace__ = None __v2_compat__ = False # set by old base in celery.task.base MaxRetriesExceededError = MaxRetriesExceededError OperationalError = OperationalError Strategy = 'celery.worker.strategy:default' Request = 'celery.worker.request:Request' _app = None name = None typing = None max_retries = 3 default_retry_delay = 3 * 60 rate_limit = None ignore_result = None trail = True send_events = True store_errors_even_if_ignored = None serializer = None time_limit = None soft_time_limit = None backend = None autoregister = True track_started = None acks_late = None acks_on_failure_or_timeout = None reject_on_worker_lost = None throws = () expires = None priority = None resultrepr_maxsize = 1024 request_stack = None _default_request = None abstract = True _exec_options = None __bound__ = False from_config = ( ('serializer', 'task_serializer'), ('rate_limit', 'task_default_rate_limit'), ('priority', 'task_default_priority'), ('track_started', 'task_track_started'), ('acks_late', 'task_acks_late'), ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'), ('reject_on_worker_lost', 'task_reject_on_worker_lost'), ('ignore_result', 'task_ignore_result'), ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'), ) _backend = None # set by backend property.
由于 task 是经过 Consumer 来调用,因此咱们要看看 Consumer 中关于 task 的部分,就是把 task 和 consumer 联系起来,这样才可以让 Consumer 具体调用到 task。
Consumer启动时候,也是要运行多个 steps。
parent.steps = {list: 8} 0 = {Connection} <step: Connection> 1 = {Events} <step: Events> 2 = {Heart} <step: Heart> 3 = {Mingle} <step: Mingle> 4 = {Gossip} <step: Gossip> 5 = {Tasks} <step: Tasks> 6 = {Control} <step: Control> 7 = {Evloop} <step: event loop> __len__ = {int} 8
consumer 会启动 Tasks 这个bootsteps,这里会:
'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
。所以,task 的回调就和 amqp.Consumer 联系,消息通路就构建完成。
代码位于:celery/worker/consumer/tasks.py
class Tasks(bootsteps.StartStopStep): """Bootstep starting the task message consumer.""" requires = (Mingle,) def __init__(self, c, **kwargs): c.task_consumer = c.qos = None super().__init__(c, **kwargs) def start(self, c): """Start task consumer.""" c.update_strategies() # 配置每一个任务的回调方法 # - RabbitMQ 3.3 completely redefines how basic_qos works.. # This will detect if the new qos smenatics is in effect, # and if so make sure the 'apply_global' flag is set on qos updates. qos_global = not c.connection.qos_semantics_matches_spec # set initial prefetch count c.connection.default_channel.basic_qos( 0, c.initial_prefetch_count, qos_global, ) c.task_consumer = c.app.amqp.TaskConsumer( c.connection, on_decode_error=c.on_decode_error, ) # task 就和 amqp.Consumer 联系起来 def set_prefetch_count(prefetch_count): return c.task_consumer.qos( prefetch_count=prefetch_count, apply_global=qos_global, ) c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
关于 task 运行实际上是须要必定策略的,这也能够认为是一种负载均衡。其策略以下:
SCHED_STRATEGY_FCFS = 1 SCHED_STRATEGY_FAIR = 4 SCHED_STRATEGIES = { None: SCHED_STRATEGY_FAIR, 'default': SCHED_STRATEGY_FAIR, 'fast': SCHED_STRATEGY_FCFS, 'fcfs': SCHED_STRATEGY_FCFS, 'fair': SCHED_STRATEGY_FAIR, }
update_strategies 会配置每一个任务的回调策略以及回调方法,好比:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
。
堆栈以下:
update_strategies, consumer.py:523 start, tasks.py:26 start, bootsteps.py:116 start, consumer.py:311 start, bootsteps.py:365 start, bootsteps.py:116 start, worker.py:204 worker, worker.py:327 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代码位于:celery/worker/consumer/consumer.py
def update_strategies(self): loader = self.app.loader # app的加载器 for name, task in items(self.app.tasks): # 遍历全部的任务 self.strategies[name] = task.start_strategy(self.app, self) # 将task的name设为key 将task start_strategy调用的返回值做为 value task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app) # 处理相关执行结果的函数
app.tasks变量以下,这就是目前 Celery 注册的全部 tasks:
self.app.tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fc5a36e8160> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fc5a36e8160> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fc5a36e8160> 'celery.group' = {group} <@task: celery.group of myTest at 0x7fc5a36e8160> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fc5a36e8160> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fc5a36e8160> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fc5a36e8160> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fc5a36e8160> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fc5a36e8160> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fc5a36e8160> __len__ = {int} 10
此时咱们继续查看task.start_strategy函数,
def start_strategy(self, app, consumer, **kwargs): return instantiate(self.Strategy, self, app, consumer, **kwargs) # 生成task实例
此时self.Strategy的默认值是celery.worker.strategy:default,
def default(task, app, consumer, info=logger.info, error=logger.error, task_reserved=task_reserved, to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t, proto1_to_proto2=proto1_to_proto2): """Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override. """ hostname = consumer.hostname # 设置相关的消费者信息 connection_errors = consumer.connection_errors # 设置错误值 _does_info = logger.isEnabledFor(logging.INFO) # task event related # (optimized to avoid calling request.send_event) eventer = consumer.event_dispatcher events = eventer and eventer.enabled send_event = eventer.send task_sends_events = events and task.send_events call_at = consumer.timer.call_at apply_eta_task = consumer.apply_eta_task rate_limits_enabled = not consumer.disable_rate_limits get_bucket = consumer.task_buckets.__getitem__ handle = consumer.on_task_request limit_task = consumer._limit_task body_can_be_buffer = consumer.pool.body_can_be_buffer Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回一个请求类 revoked_tasks = consumer.controller.state.revoked def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): if body is None: body, headers, decoded, utc = ( message.body, message.headers, False, True, ) if not body_can_be_buffer: body = bytes(body) if isinstance(body, buffer_t) else body else: body, headers, decoded, utc = proto1_to_proto2(message, body) # 解析接受的数据 req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) # 实例化请求 if (req.expires or req.id in revoked_tasks) and req.revoked(): return if task_sends_events: send_event( 'task-received', uuid=req.id, name=req.name, args=req.argsrepr, kwargs=req.kwargsrepr, root_id=req.root_id, parent_id=req.parent_id, retries=req.request_dict.get('retries', 0), eta=req.eta and req.eta.isoformat(), expires=req.expires and req.expires.isoformat(), ) # 若是须要发送接受请求则发送 if req.eta: # 时间相关处理 try: if req.utc: eta = to_timestamp(to_system_tz(req.eta)) else: eta = to_timestamp(req.eta, timezone.local) else: consumer.qos.increment_eventually() call_at(eta, apply_eta_task, (req,), priority=6) else: if rate_limits_enabled: # 速率限制 bucket = get_bucket(task.name) if bucket: return limit_task(req, bucket, 1) task_reserved(req) # if callbacks: [callback(req) for callback in callbacks] handle(req) # 处理接受的请求 return task_message_handler
此时处理的 handler 就是在 consumer 初始化的时候传入的 w.process_task,
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
操做以后,获得了每一个task的回调策略,这样当多进程调用时候,就知道如何调用task了,即对于咱们目前的各个 task,当从broker 拿到任务消息以后,咱们都调用 task_message_handler
。
strategies = {dict: 10} 'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60> 'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400> 'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598> 'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840> 'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8> 'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90> 'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0> 'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378> 'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620> 'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8> __len__ = {int} 10
celery.worker.strategy:default 之中,这部分代码须要看看:
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回一个请求类
Strategy 中,如下目的是为了 根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来。
具体能够看到 Request. execute_using_pool 这里就会和多进程处理开始关联,好比和 comsumer 的 pool 进程池联系起来。
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
task 实例为:
myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]
得到Requst代码为:
def create_request_cls(base, task, pool, hostname, eventer, ref=ref, revoked_tasks=revoked_tasks, task_ready=task_ready, trace=trace_task_ret): default_time_limit = task.time_limit default_soft_time_limit = task.soft_time_limit apply_async = pool.apply_async acks_late = task.acks_late events = eventer and eventer.enabled class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id if (self.expires or task_id in revoked_tasks) and self.revoked(): raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits result = apply_async( trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result def on_success(self, failed__retval__runtime, **kwargs): failed, retval, runtime = failed__retval__runtime if failed: if isinstance(retval.exception, ( SystemExit, KeyboardInterrupt)): raise retval.exception return self.on_failure(retval, return_ok=True) task_ready(self) if acks_late: self.acknowledge() if events: self.send_event( 'task-succeeded', result=retval, runtime=runtime, ) return Request
前面回调函数 task_message_handler中有 req = Req(...),这就涉及到了如何调用多进程,即 Request 类处理。
def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) # 实例化请求 if req.eta: # 时间相关 else: task_reserved(req) # if callbacks: [callback(req) for callback in callbacks] handle(req) # 处理接受的请求 return task_message_handler
注意:
此时处理的 handle(req) 的 handle函数 就是在 consumer 初始化的时候传入的 w.process_task,
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
因此,handle(req) 实际上就是调用 Request 的 execute_using_pool 函数,就来到了多进程。
代码为:
class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id# 获取任务id if (self.expires or task_id in revoked_tasks) and self.revoked():# 检查是否过时或者是否已经执行过 raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits# 获取时间 result = apply_async(# 执行对应的func并返回结果 trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result
由于信息量太大,因此分为三个图展现。
strategy 逻辑为:
+-----------------------+ +---------------------------+ | Celery | | Consumer | | | | | | consumer +---------------------> | | +---------------+ | | | task_consumer +---------------> | amqp.Consumer | | _tasks | | | +---------------+ | + | | | | | | | strategies +----------------+ +-----------------------+ | | | | | | | | +---------------------------+ | | v v +------------------------------------------------------+-------------------------------------+ +-----------------------------------------------------------------------------+ | | | strategies = {dict: 10} | | TaskRegistry | | 'celery.chunks' = function default.<locals>.task_message_handler | | | | 'celery.backend_cleanup' = function default.<locals>.task_message_handler | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | | 'celery.chord_unlock' = function default.^locals>.task_message_handler | | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.group' = function default.<localsv.task_message_handler | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.map' = function default.<locals>.task_message_handler | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.chain' = function default.<locals>.task_message_handler | | 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.starmap' = function default.<locals>.task_message_handler | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chord' = function default.<locals>.task_message_handler | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'myTest.add' = function default.<locals^.task_message_handler | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.accumulate' = function default.vlocals>.task_message_handler | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | | | 'myTest.add' = {add} <@task: myTest.add of myTest> | +-----------------------------------------------------------------------------+ | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手机以下
Celery 应用中注册的task 逻辑为
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------+-----------------------------------------------------------------------+ | | | +--------------------------------------------------------------------------------------------+ finalize v | | | TaskRegistry | +---------------------------+ | | | | | | | Celery | | | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | _process_task <-------------------+ process_task| | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | _tasks +-------------> | 'celery.group' = {group} <@task: celery.group of myTest> | +---------------+ | | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | amqp.Consumer | <--------+ task_consumer | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | +---------------+ | | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | +---------------------------+ | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手机以下:
当从broker获取消息以后,处理任务时候逻辑为:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +--------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +-----------+ myTest.add
手机以下图:
至此,Celery启动所有分析结束,咱们下一步看看一个完整的例子,即消息如何从发送到被消费的流程。