Celery,Tornado,Supervisor构建和谐的分布式系统

Celery 分布式的任务队列

与rabbitmq消息队列的区别与联系:

  • rabbitmq 调度的是消息,而Celery调度的是任务.
  • Celery调度任务时,须要传递参数信息,传输载体能够选择rabbitmq.
  • 利用rabbitmq的持久化和ack特性,Celery能够保证任务的可靠性.

优势:

  • 轻松构建分布式的Service Provider。
  • 高可扩展性,增长worker也就是增长了队列的consumer。
  • 可靠性,利用消息队列的durable和ack,能够尽量下降消息丢失的几率,当worker崩溃后,未处理的消息会从新进入消费队列。
  • 用户友好,利用flower提供的管理工具能够轻松的管理worker。
    flower
  • 使用tornado-celery,结合tornado异步非阻塞结构,能够提升吞吐量,轻松建立分布式服务框架。
  • 学习成本低,可快速入门

快速入门

定义一个celery实例main.py:html

1
2
3
4
from celery import Celery
app = Celery('route_check', include=['check_worker_path'],
broker='amqp://user:password@rabbitmq_host:port//')
app.config_from_object('celeryconfig')

include指的是须要celery扫描是否有任务定义的模块路径。例如add_task 就是扫描add_task.py中的任务python

celery的配置文件能够从文件、模块中读取,这里是从模块中读取,celeryconfig.py为:git

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import cpu_count

from celery import platforms
from kombu import Exchange, Queue

CELERYD_POOL_RESTARTS = False
CELERY_RESULT_BACKEND = 'redis://:password@redis_host:port/db'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('common_check', Exchange('route_check'), routing_key='common_check'),
Queue('route_check', Exchange('route_check'), routing_key='route_check', delivery_mode=2),
Queue('route_check_ignore_result', Exchange('route_check'), routing_key='route_check_ignore_result',
delivery_mode=2)
)
CELERY_ROUTES = {
'route_check_task.check_worker.common_check': {'queue': 'common_check'},
'route_check_task.check_worker.check': {'queue': 'route_check'},
'route_check_task.check_worker.check_ignore_result': {'queue': 'route_check_ignore_result'}
}
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# CELERY_MESSAGE_COMPRESSION = 'gzip'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = cpu_count() / 2
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_PUBLISH_RETRY = True
CELERY_TASK_PUBLISH_RETRY_POLICY = {
'max_retries': 3,
'interval_start': 10,
'interval_step': 5,
'interval_max': 20
}
platforms.C_FORCE_ROOT = True

这里面是一些celery的配置参数github

在上面include的add_task.py定义以下:web

1
2
3
4
5
6
7
#encoding:utf8

from main import app

@app.task
def add(x,y):
return x+y

启动celery
celery -A main worker -l info -Ofairredis

  • -A 后面是包含celery定义的模块,咱们在main.py中定义了app = Celery...
    测试celery:
  • -l 日志打印的级别,这里是info
  • -Ofair 这个参数可让Celery更好的调度任务
1
2
3
4
5
6
7
8
9
10
# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.apply_async((1,2))
print type(result)
print result.ready()
print result.get()
print result.ready()

输出是json

1
2
3
4
<class 'celery.result.AsyncResult'>
False
3
True

当调用result.get()时,若是尚未返回结果,将会阻塞直到结果返回。这里须要注意的是,若是须要返回worker执行的结果,必须在以前的config中配置CELERY_RESULT_BACKEND这个参数,通常推荐使用Redis来保存执行结果,若是不关心worker执行结果,设置CELERY_IGNORE_RESULT=True就能够了,关闭缓存结果能够提升程序的执行速度。
在上面的测试程序中,若是修改成:浏览器

1
2
3
4
5
6
7
8
# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.(1,2)
print type(result)
print result

输出结果为:缓存

1
2
<type 'int'>
3

至关于直接本地调用了add方法,并无走Celery的调度。
经过flower的dashbord能够方便的监控任务的执行状况:
task list
task detail
还能够对worker进行重启,关闭之类的操做
taks_op
使用Celery将一个集中式的系统拆分为分布式的系统大概步骤就是:
服务器

  • 根据功能将耗时的模块拆分出来,经过注解的形式让Celery管理
  • 为拆分的模块设置独立的消息队列
  • 调用者导入须要的模块或方法,使用apply_async进行异步的调用并根据需求关注结果。
  • 根据性能须要能够添加机器或增长worker数量,方便弹性管理。

须要注意的是:

  • 尽可能为不一样的task分配不一样的queue,避免多个功能的请求堆积在同一个queue中。
  • celery -A main worker -l info -Ofair -Q add_queue启动Celery时,能够经过参数Q加queue_name来指定该worker只接受指定queue中的tasks.这样能够使不一样的worker各司其职。
  • CELERY_ACKS_LATE可让你的Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费。
  • CELERY_DISABLE_RATE_LIMITS Celery能够对任务消费的速率进行限制,若是你没有这个需求,就关闭掉它吧,有益于会加速你的程序。

tornado-celery

tornado应该是python中最有名的异步非阻塞模型的web框架,它使用的是单进程轮询的方式处理用户请求,经过epoll来关注文件状态的改变,只扫描文件状态符发生变化的FD(文件描述符)。
因为tornado是单进程轮询模型,那么就不适合在接口请求后进行长时间的耗时操做,而是应该接收到请求后,将请求交给背后的worker去干,干完活儿后在经过修改FD告诉tornado我干完了,结果拿走吧。很明显,Celery与tornado很般配,而tornado-celery是celery官方推荐的结合二者的一个模块。
整合二者很容易,首先须要安装:

  • tornado-celery
  • tornado-redis
    tornado代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# encoding:utf8
__author__ = 'brianyang'

import tcelery
import tornado.gen
import tornado.web

from main import app
import add_task

tcelery.setup_nonblocking_producer(celery_app=app)


class CheckHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self):
x = int(self.get_argument('x', '0'))
y = int(self.get_argument('y', '0'))
response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y])
self.write({'results': response.result})
self.finish


application = tornado.web.Application([
(r"/add", CheckHandler),
])

if __name__ == "__main__":
application.listen(8889)
tornado.ioloop.IOLoop.instance().start()

在浏览器输入:http://127.0.0.1:8889/add?x=1&y=2
结果为:

经过tornado+Celery能够显著的提升系统的吞吐量。

Benchmark

使用Jmeter进行压测,60个进程不间断地的访问服务器:
接口单独访问响应时间通常在200~400ms

  • uwsgi + Flask方案:
    uwsgi关键配置:
    1
    2
    processes       = 10
    threads = 3

Flask负责接受并处理请求,压测结果:
qps是46,吞吐量大概是2700/min
uwsgi+Flask

  • tornado+Celery方案:
    Celery配置:
    CELERYD_CONCURRENCY = 10也就是10个worker(进程),压测结果:
    qps是139,吞吐量大概是8300/min
    tornado+Celery
    从吞吐量和接口相应时间各方面来看,使用tornado+Celery都能带来更好的性能。

Supervisor

  • 什么是supervisor
    supervisor俗称Linux后台进程管理器
  • 适合场景
    – 须要长期运行程序,除了nohup,咱们有更好的supervisor
    – 程序意外挂掉,须要重启,让supervisor来帮忙
    – 远程管理程序,不想登录服务器,来来来,supervisor提供了高大上(屁~)的操做界面.
    以前启动Celery命令是celery -A main worker -l info -Ofair -Q common_check,当你有10台机器的时候,每次更新代码后,都须要登录服务器,而后更新代码,最后再杀掉Celery进程重启,恶不恶心,简直恶心死了。
    让supervisor来,首先须要安装:
    pip install supervisor
    配置文件示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
[unix_http_server]
file=/tmp/supervisor.sock ; path to your socket file
chmod=0777
username=admin
password=admin

[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

[supervisord]
logfile=/var/log/supervisord.log ; supervisord log file
logfile_maxbytes=50MB ; maximum size of logfile before rotation
logfile_backups=10 ; number of backed up logfiles
loglevel=info ; info, debug, warn, trace
pidfile=/var/run/supervisord.pid ; pidfile location
nodaemon=false ; run supervisord as a daemon
minfds=1024 ; number of startup file descriptors
minprocs=200 ; number of process descriptors
user=root ; default user
childlogdir=/var/log/ ; where child log files will live

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets.
username=admin
password=admin
[program:celery]
command=celery -A main worker -l info -Ofair

directory=/home/q/celeryTest
user=root
numprocs=1
stdout_logfile=/var/log/worker.log
stderr_logfile=/var/log/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 10

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; Set Celery priority higher than default (999)
; so, if rabbitmq is supervised, it will start first.
priority=1000

示例文件很长,不要怕,只须要复制下来,改改就能够
比较关键的几个地方是:

1
2
3
4
[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

这个可让你经过访问http://yourhost:2345 ,验证输入admin/admin的方式远程管理supervisor,效果以下:
remote supervisor
[program:flower]这里就是你要托管给supervisor的程序的一些配置,其中autorestart=true能够在程序崩溃时自动重启进程,不信你用kill试试看。
剩下的部分就是一些日志位置的设置,当前工做目录设置等,so esay~

supervisor优势:

  • 管理进程简单,不再用nohup & kill了。
  • 不再用担忧程序挂掉了
  • web管理很方便

缺点:

  • web管理虽然方便,可是每一个页面只能管理本机的supervisor,若是我有一百台机器,那就须要打开100个管理页面,很麻烦.

怎么办~

supervisor-easy闪亮登场

经过rpc调用获取配置中的每个supervisor程序的状态并进行管理,能够分组,分机器进行批量/单个的管理。方便的不要不要的。来两张截图:

    • 分组管理:
      group
    • 分机器管理:
      server经过简单的配置,能够方便的进行管理。
相关文章
相关标签/搜索