上一章,咱们讲到,用redis共享数据,以及用redis中的队列来实现一个简单的消息传递。其实在真实的过程当中,不该该用redis来传递,最好用专业的消息队列,咱们python中,用到最普遍的就是rabbitmq,虽然它是用erlang开发的,但真的很是好用,通过无数次验证。若是你们不会安装rabbitmq,请看我这篇文章,http://www.cnblogs.com/yueerwanwan0204/p/5319474.html 这篇文章讲解了怎么安装rabbitmq以及简单的使用它。html
咱们把上一章的图再稍微修改一下,前端
其实在真实的项目中,也这样,通常来讲,利用redis在不一样模块之间共享数据,利用rabbitmq来进行消息传递。咱们这个项目只作到从web到flask,再到rabbitmq,传递给tcpserver,再下放给具体的tcpclient客户端;其实还能够反向传递,即从tcp的client到tcp服务器,再到rabbitmq,到前端tcp或者前端http,可是这个前端tcp或者http要基于循环模式的,flask确定不行。咱们从下一章开始讲tornado,用tornado来接受,而且作一个websocket,就能够下放下去。python
好了,说了这么多,咱们来看一下代码,首先,tcpserver这块,咱们以前用redis的队列作消息队列,如今修改一下,修改的大概代码以下:react
import pika from pika.adapters import twisted_connection RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' class RabbitMQ(object): _connection = None _channel_receive_from_http = None @staticmethod @defer.inlineCallbacks def init_mq(ip_address, port): credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) parameters = pika.ConnectionParameters(credentials=credentials) cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) RabbitMQ._connection = yield cc.connectTCP(ip_address, port) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def set_channel_receive_from_back(user_factory): """ 设置rabbitmq消息接受队列的channel,而且作好循环任务 """ RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel() yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp') queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True) l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory) l.start(0.5) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def read_from_mq(queue_object, chat_factory): """ 读取接受到的消息队列消息,而且处理 """ ch, method, properties, body = yield queue_object.get() if body: log.msg('Accept data from http successful!') chat_factory.process_data_from_mq(body) defer.returnValue(1) defer.returnValue(0)
首先,你们要注意一下,因为twisted是异步的,因此不能采用原先阻塞的函数,链接或者接受或者发送消息,全部跟rabbitmq的链接,发送,接受,都要异步化,即都要返回defer对象。由于链接rabbitmq的本质,其实就是socket的网络行为,任何网络行为都有可能被阻塞,一旦阻塞,异步的效率会极其低下。(之后咱们写tornado也是这样,必定要返回future对象)。web
我看到网上还有不少博客,在接受rabbitmq的消息的时候,竟然开了另一个进程或者线程,有时候这么作,程序运行起来没问题,但涉及到异步的时候,仍是会影响效率。都已经用异步的代码了,就不该该大量使用多进程或者多线程。多进程或者多线程,会让cpu调度频繁切换,大量并发的时候,严重影响效率。redis
详细看上面的代码,简单的解释一下,json
init_mq就是初始化消息队列,先加入用户名,密码,返回一个相似与token的东西,而后用twisted客户端来链接rabbitmq,其实就是socket行为,返回一个connection。flask
set_channel_receive_from_back设置channel,其实就是定义一个管道,我从这个管道接受东西。接受并读取的过程其实就是写一个循环任务,这个循环任务每0.5秒执行一次,你也能够写小一点,0.1秒执行一次,具体的看你须要设置。api
read_from_mq就是真正的读取并处理的函数,我这边在read_from_mq中,加了一个参数,就是这个工厂对象,由于接受的时候,一个工厂,就产生一个接受函数。而后读取到消息之后,把消息传递到这个工厂对象的处理方法中,整个环节就完整了。服务器
RabbitMQ的3个方法全是静态方法,因此我没有生成RabbitMQ对象,直接使用这个类自己就能够了。因此在运行的时候,又加了以下代码。
cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq) task_receive_data_from_mq.start(0.1, now=False) reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT) reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf) reactor.listenTCP(8124, cf) reactor.run()
看见我加的代码没有,一个init_mq,一个set_channel_receive_from_back。一个初始化消息队列,初始化好之后,再设置channel,而且开始接受消息。
整个tcpserver这块就算完成了,下面是整个tcpserver的代码
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor, task, defer, protocol import struct import json from twisted.python import log import sys import time import txredisapi as redis import pika from pika.adapters import twisted_connection log.startLogging(sys.stdout) REDIS_HOST = 'localhost' REDIS_PORT = 6380 REDIS_DB = 4 REDIS_PASSWORD = 'dahai123' RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123') @defer.inlineCallbacks def check_token(phone_number, token): token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token') if token != token_in_redis: defer.returnValue(False) else: defer.returnValue(True) class RabbitMQ(object): _connection = None _channel_receive_from_http = None @staticmethod @defer.inlineCallbacks def init_mq(ip_address, port): credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) parameters = pika.ConnectionParameters(credentials=credentials) cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) RabbitMQ._connection = yield cc.connectTCP(ip_address, port) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def set_channel_receive_from_back(user_factory): """ 设置rabbitmq消息接受队列的channel,而且作好循环任务 """ RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel() yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp') queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True) l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory) l.start(0.5) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def read_from_mq(queue_object, chat_factory): """ 读取接受到的消息队列消息,而且处理 """ ch, method, properties, body = yield queue_object.get() if body: log.msg('Accept data from http successful!') chat_factory.process_data_from_mq(body) defer.returnValue(1) defer.returnValue(0) class Chat(Protocol): def __init__(self, factory): self.factory = factory self.phone_number = None self.state = "VERIFY" self.version = 0 self.last_heartbeat_time = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat, 5: self.handle_heartbeat } self._data_buffer = bytes() def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): log.msg("[%s]:断线" % self.phone_number.encode('utf-8')) if self.phone_number in self.factory.users: del self.factory.users[self.phone_number] def dataReceived(self, data): """ 接受到数据之后的操做 """ self._data_buffer += data while True: length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer): return content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4, 5]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) if self.state == "DATA": self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12: return def handle_heartbeat(self, content): """ 处理心跳包 """ self.last_heartbeat_time = int(time.time()) @defer.inlineCallbacks def handle_verify(self, content): """ 验证函数 """ content = json.loads(content) phone_number = content.get('phone_number') token = content.get('token') result = yield check_token(phone_number, token) if not result: send_content = json.dumps({'code': 0}) self.send_content(send_content, 101, [phone_number]) length = 12 + len(send_content) version = self.version command_id = 101 header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) return if phone_number in self.factory.users: log.msg("电话号码<%s>存在老的链接." % phone_number.encode('utf-8')) self.factory.users[phone_number].connectionLost("") self.factory.users.pop(phone_number) log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.factory.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根据command_id来分配函数 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 单播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 组播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 广播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.factory.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 发送函数 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.factory.users.keys(): self.factory.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self) def check_users_online(self): for key, value in self.users.items(): if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4: log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8')) value.transport.abortConnection() @defer.inlineCallbacks def receive_from_mq(self): data = yield redis_store.rpop('front_tcp') if data: log.msg("接受到来自消息队列的消息:", data) self.process_data_from_mq(data) def process_data_from_mq(self, data): loads_data = json.loads(data) command_id = loads_data.get('command_id') phone_numbers = loads_data.get('chat_to') chat_from = loads_data.get('chat_from') chat_content = loads_data.get('chat_content') content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(content, command_id, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 发送函数 """ length = 12 + len(send_content) version = 1100 command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8')) cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq) task_receive_data_from_mq.start(0.1, now=False) reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT) reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf) reactor.listenTCP(8124, cf) reactor.run()
下面是web方面的代码,web也是,以前用redis很简单的作,如今换到rabbitmq,因为这个例子很简单,因此我就在request过程当中初始化rabbitmq了,整个代码就很是简单了,就是一个发送函数而已。
# coding:utf-8 from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app from app.model import User, db_session import json from . import web import pika RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' @web.teardown_request def handle_teardown_request(exception): db_session.remove() @web.route('/send-command', methods=['GET', 'POST']) def send_command(): if request.method == 'GET': users = User.query.all() return render_template('web/send-command.html', users=users) else: data = request.get_json() command_id = data.get('command_id') chat_from = '13764408552' chat_to = data.get('chat_to') chat_content = data.get('content') if not chat_to or not chat_content or not command_id: return jsonify({'code': 0, 'message': '信息不完整'}) send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) # current_app.redis.lpush('front_tcp', send_data) credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials, port=RABBITMQ_PORT)) channel = connection.channel() channel.queue_declare(queue='front_tcp') channel.basic_publish(exchange='', routing_key='front_tcp', body=send_data) print "send json_data to front_tcp, the data is ", send_data connection.close() return jsonify({'code': 1, 'message': '发送成功'})
全部代码更换完成,看一下具体效果吧
web上先发送一个消息。
随便启动一个客户端,看看接受吧。
看见没有,整个过程就所有打通了。
总结:整个twisted就讲到这了,你们能够看到,twisted我也不是特别熟悉,因此我一共就用了5章把它讲完。从下一章开始,我开始讲tornado,利用tornado作tcpserver,tcpclient,websocket服务器,由于tornado的源码比较好读,因此我重点也会放在tornado上。最近我在看reactjs,届时我会用稍微好看一点的图形界面,来作websocket页面,tornado这个库真正作到small strong smart,我一直喜欢小而精的库。总之,我重点会放在tornado上,但愿你们到时候会喜欢。