tornado配合celery及rabbitmq实现web request异步非阻塞

Tornado和Celery介绍

1.Tornado

Tornado是一个用python编写的一个强大的、可扩展的异步HTTP服务器,同时也是一个web开发框架。tornado是一个非阻塞式web服务器,其速度至关快。得利于其非阻塞的方式和对 epoll的运用,tornado每秒能够处理数以千计的链接,这意味着对于实时web服务来讲,tornado是一个理想的web框架。它在处理严峻的网络流量时表现得足够强健,但却在建立和编写时有着足够的轻量级,并可以被用在大量的应用和工具中。
进一步了解和学习tornado可移步: tornado官方文档

2.Celery

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专一于实时处理的任务队列, 同时也支持任务调度。Celery 中有两个比较关键的概念:
  • Worker: worker 是一个独立的进程,它持续监视队列中是否有须要处理的任务;
  • Broker: broker 也被称为中间人或者协调者,broker 负责协调客户端和 worker 的沟通。客户端向 队列添加消息,broker 负责把消息派发给 worker。

3.RabbitMQ

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者没法快速消费,那么须要一个中间层。保存这个数据。html

例如一个日志系统,很容易使用RabbitMQ简化工做量,一个Consumer能够进行消息的正常处理,另外一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一exchange便可,剩下的消息分发工做由RabbitMQ完成。python

通常状况下,一个工具库或者一个框架都是独立的,有本身的feature或者功能点,可能依赖其余的库,但毫不依赖于其余服务。可是celery是一个特例,若是celery没有broker这个服务,那就彻底不能用了。celery 支持多种 broker, 但主要以 RabbitMQ 和 Redis 为主,其余都是试验性的,虽然也可使用, 可是没有专门的维护者。官方推荐使用rabbitmq做为生产环境下的broker,redis虽然也在官方指名的broker之列,可是实际使用上有可能还会出现如下莫名其妙的问题。mysql

Celery的配置和使用方法详见:官方文档git

从Tornado的异步讲起

tornado的同步阻塞

用tornado进行web开发的过程当中(实际上用任何语言或者框架开发都会遇到),开发者可能会发现有时候tornado的响应会变慢,追根溯源会发现缘由之一就是由于该请求被其余请求阻塞了。这就有问题了啊!!!tornado不是标榜本身是异步Http Web Server吗?不是号称本身解决了C10K问题了吗?这是欺骗消费者啊!!!
可是,深刻了解tornado以后才发现,人家说的异步非阻塞是有条件的,只有按照它说的来,才能实现真正的异步非阻塞。。。
咱们先来看一个小例子:github

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import torndb
import time

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

db = torndb.Connection('127.0.0.1:3306', 'user_db', 'username', 'passwd')

class MysqlHandler(tornado.web.RequestHandler):
    def get(self, flag):
        self.write(db.query('select * from table where flag=%s', flag))

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", MysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

当咱们先请求/mysql_query接口时再请求/i_want_you_now接口,会发现原来能够马上返回的第二个请求却被一直阻塞到第一个接口执行完以后才返回。为何?由于大部分web框架都是使用的同步阻塞模型来处理请求的,tornado的默认模型也不例外。可是tornado但是一个异步http服务器啊,不会这么弱吧?并且不上场景下都有一些至关耗时的操做,这些操做就会阻塞其余一些普通的请求,应该怎么解决这个问题?web

相信不少使用过tornado的人会想到@tornado.web.asynchronous这个装饰器,可是这就是tornado官方鸡贼的地方了!!!装饰器 web.asynchronous 只能用在verb函数以前(即get/post/delete等),而且须要搭配tornado异步客户端使用,如httpclient.AsyncHTTPClient,或者,你须要异步执行的那个函数(操做)必须也是异步的。。。(我是怨念满满的粗体!!!),并且加上这个装饰器后,开发者必须在异步回调函数里显式调用 RequestHandler.finish 才会结束此次 HTTP 请求。(由于tornado默认在函数处理返回时会自动关闭客户端的链接)redis

什么意思呢?就是说,tornado:老子只给你提供异步的入口,你要是真想异步操做,要不你就使用我提供的一些异步客户端来搞,否则你就本身实现一个异步的操做。sql

以操做MongoDB为例,若是你的函数中含有调用mongo的调用(使用pymongo库),那么这时候你加asynchronous这个装饰器就没有任何效果了,由于你的mongo调用自己是同步的,若是想作成异步非阻塞的效果,须要使用mongo出品的另外一个python driver -- motor,这个driver支持异步操做mongo,这时候你再加asynchronous装饰器并操做mongo就能够实现异步非阻塞的效果了。数据库

异步非阻塞的实现

因此,若是要使用tornado的异步调用,第一,使用tornado内置的异步客户端如httpclient.AsyncHTTPClient等;第二,可参考内置异步客户端,借助tornado.ioloop.IOLoop封装一个本身的异步客户端,但开发成本并不小。服务器

然而,天无绝人之路,仍是有办法能够用较低的成本实现tornado的异步非阻塞的,那就是借助celery项目。前面说了,它是一个分布式的实时处理消息队列调度系统,tornado接到请求后,能够把全部的复杂业务逻辑处理、数据库操做以及IO等各类耗时的同步任务交给celery,由这个任务队列异步处理完后,再返回给tornado。这样只要保证tornado和celery的交互是异步的,那么整个服务是彻底异步的。至于如何保证tornado和celery之间的交互是异步的,能够借助tornado-celery这个适配器来实现。

celery配合rabbitmq的工做流程以下:

这里咱们来使用这几个组件重写前面的同步阻塞的例子:

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient

import time
import tcelery, tasks
from tornado.options import define, options
tcelery.setup_nonblocking_producer()
define("port", default=8000, help="run on the given port", type=int)

class AsyncMysqlHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, flag):
        res = yield tornado.gen.Task(tasks.query_mysql.apply_async, args=[flag])
        self.write(res.result)
        self.finish()

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", AsyncMysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

这里有个新的tornado.gen.coroutine装饰器, coroutine是3.0以后新增的装饰器.之前的办法是用回调函数的方式进行异步调用,若是使用回调函数的方式,则代码以下:

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import time
import tcelery, tasks
from tornado.options import define, options
tcelery.setup_nonblocking_producer()
define("port", default=8000, help="run on the given port", type=int)

class AsyncMysqlHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, flag):
        tasks.query_mysql.apply_async(args=[flag], callback=self.on_result)

    def on_result(self, response):
        self.write(response.result)
        self.finish()

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", AsyncMysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

使用callback的话始终以为会是的代码结构变得比较混乱,试想若是有大量异步回调,每个都写一个回调函数的话,势必致使项目代码结构变得不那么清晰和优雅,毕竟回调这种反人类的写法仍是不少人不喜欢的,但也看我的喜爱,不喜欢callback风格的可使用yield来进行异步调用。

tasks.py集中放置开发者须要异步执行的函数。

import time
import torndb
from celery import Celery

db = torndb.Connection('127.0.0.1:3306', 'user_db', 'username', 'passwd')
app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"

@app.task(name='task.query_users')
def query_mysql(flag):
    return db.query('select * from table where flag=%s', flag)

if __name__ == "__main__":
    app.start()

而后启动celery worker监放任务队列(消费者会从任务队列中取走一个个的task并执行):

celery -A tasks worker --loglevel=info

自此,依靠这种架构,能够实现tornado处理请求的彻底异步调用。

问题及优化

1.队列过长问题

使用上述方案的异步非阻塞可能会依赖于celery的任务队列长度,若队列中的任务过多,则可能致使长时间等待,下降效率。
解决方案:

  • 启动多个celery worker监放任务队列,使用多进程并发消费任务队列,celery命令能够经过-concurrency参数来指定用来执行任务而prefork的worker进程,若是全部的worker都在执行任务,那么新添加的任务必需要等待有一个正在执行的任务完成后才能被执行,默认的concurrency数量是机器上CPU的数量。另外,celery是支持好几个并发模式的,有prefork,threading,协程(gevent,eventlet),prefork在celery的介绍是,默认是用了multiprocess来实现的;能够经过-p参数指定其余的并发模型,如gevent(需本身配置好gevent环境)。
  • 创建多个任务queue,把大量的任务分发到不一样的queue中,减轻单个queue时可能出现的任务数量过载。

2.水平扩展优化

前面说了celery是一个分布式系统,也就是说,基于celery的项目可无痛实现分布式扩展,前面写的tornado和celery配合的demo,也能够实现独立部署,即tornado server和celery server其实能够分开部署,即分布在不一样的服务器上,celery server部署本身的tasks.py任务,并启动celery worker监听,而后在tornado server上添加如下代码:

from celery import Celery
app = Celery(broker = "amqp://",)

并使用Celery的send_task函数调用任务:

app.send_task('function_name', args=[param1, param2, param3...])

便可实现tornado和celery的彻底解耦。

后续:

另外,了解到tornado.concurrent.futures(py3自带这个库,py2需单独安装)这个module能够实现自定义函数的异步化,目前尚未深刻了解这个东西,有时间去研究一下这个东西,有心得再分享一下这个module相关的知识。

相关文章
相关标签/搜索