在1.9版本以后,Django实现了对Channels的支持,他所使用的是WebSocket通讯,解决了实时通讯的问题,并且在使用WebSocket进行通讯的同时依旧可以支持HTTP通讯。web
在此结构中必须有硬性要求,具体以下:django
新的目录以下: |-- channels_example | |--channels_example | |-- __init__.py | |-- settings.py | |-- urls.py | |-- wsgi.py | |-- routing.py #必须 | |-- consumer.py #必须 | |-- asgi.py | |-- manage.py
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels', ]
CHANNEL_LAYERS = { "default": { "BACKEND": "asgiref.inmemory.ChannelLayer", "ROUTING": "channels_example.routing.channel_routing", }, }
须要注意的是 ROUTING 参数,他是用来指定WebSocket表单的位置,当有WebSocket请求访问时,就会根据这个路径找到相应表单,调用相应的函数进行处理。
channels_example.routing 就是咱们刚才建好的routing,py文件,里面的channel_routing咱们下面会进行填充。json
from channels.routing import route import consumers channel_routing = [ route('websocket.connect', consumers.ws_connect), route('websocket.disconnect', consumers.ws_disconnect), # route('websocket.receive', consumers.ws_message), route('websocket.receive', consumers.ws_message_uuid), ]
from django.http import HttpResponse from channels.handler import AsgiHandler #message.reply_channel 一个客户端通道的对象 #message.reply_channel.send(chunk) 用来惟一返回这个客户端 #一个管道大概会持续30s def ws_connect(message): auth = True if not auth: reply = json.dumps({'error': error}) message.reply_channel.send({'text': reply, 'close': True}) else: reply = "{}" message.reply_channel.send({'text': reply}) print(">>> %s connected" % str(message)) def ws_disconnect(message): print("<<< %s disconnected" % str(message)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Empty: continue message_queue.task_done() def ws_message_uuid(message): task = Task.create(message) if task: message_queue.put(task)
Tornado在websocket模块中提供了一个WebSocketHandler类。这个类提供了和已链接的客户端通讯的WebSocket事件和方法的钩子。当一个新的WebSocket链接打开时,open方法被调用,而on_message和on_close方法分别在链接接收到新的消息和客户端关闭时被调用。跨域
此外,WebSocketHandler类还提供了write_message方法用于向客户端发送消息,close方法用于关闭链接。浏览器
class EchoHandler(tornado.websocket.WebSocketHandler): def open(self): self.write_message('connected!') def on_message(self, message): self.write_message(message)
正如你在咱们的EchoHandler实现中所看到的,open方法只是使用WebSocketHandler基类提供的write_message方法向客户端发送字符串"connected!"。每次处理程序从客户端接收到一个新的消息时调用on_message方法,咱们的实现中将客户端提供的消息原样返回给客户端。这就是所有!让咱们经过一个完整的例子看看实现这个协议是如何简单的吧。安全
当一个WebSocket链接创建后被调用。服务器
当客户端发送消息message过来时被调用,注意此方法必须被重写。websocket
当WebSocket链接关闭后被调用。网络
向客户端发送消息messagea,message能够是字符串或字典(字典会被转为json字符串)。若binary为False,则message以utf8编码发送;二进制模式(binary=True)时,可发送任何字节码。session
关闭WebSocket链接。
判断源origin,对于符合条件(返回判断结果为True)的请求源origin容许其链接,不然返回403。能够重写此方法来解决WebSocket的跨域请求(如始终return True)。
#coding=utf-8 import uuid import os from works.actions import work import hashlib import json import Queue from threading import Thread import numpy as np import cv2 import base64 import jwt import tornado.gen from handlers.base_handler import BaseWebSocket from config import MEDIA_ROOT import time message_queue = Queue.PriorityQueue() def work_loop(): while True: task = message_queue.get() iuuid = task.uuid offset_top = task.offset_top image_data = task.image_data channel = task.channel zoom = task.zoom rType = task.rType responseType = task.responseType print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top)) filename = str(uuid.uuid1()) + '.jpg' filepath = os.path.join(MEDIA_ROOT, filename) with open(filepath, 'wb') as f: f.write(image_data.decode("base64")) if zoom != 1.0: im = cv2.imread(filepath) if im is None: continue osize = im.shape[1], im.shape[0] size = int(im.shape[1] * zoom), int(im.shape[0] * zoom) im = cv2.resize(im, size) cv2.imwrite(filepath, im) try: reply = work(filepath, use_crop=False, result=rType,responseType=responseType) except Exception as e: print("!!!!!! %s -> %s caused error" % (iuuid, filename)) print(e) cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename)) os.system(cmd.encode('utf-8')) continue if responseType == 'url': # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename reply = {'url': rtn_url, 'uuid': iuuid} reply['uuid'] = iuuid channel.write_message({'text': json.dumps(reply)}) print '%s end time:' % channel, time.time() class BrowserWebSocket(BaseWebSocket): '''浏览器websocket服务器''' def open(self): '''新的WebSocket链接打开时被调用''' # message = {} # remote_ip = self.request.remote_ip # message['query_string']=self.get_argument('query_string') # message['remote_ip']=remote_ip # auth, error = verify_auth_token(message) auth = True error = 'error' if not auth: reply = json.dumps({'error': error}) self.write_message({'text': reply, 'close': True}) else: reply = "{}" self.write_message({'text': reply}) print(">>> %s connected" % self.request.remote_ip) def on_message(self, message): '''链接收到新消息时被调用''' print '%s start time:'%self,time.time() task = Task.create(message,self) if task: message_queue.put(task) @tornado.gen.coroutine def on_messages(self, message): '''链接收到新消息时被调用''' task = Task.create(message,self) if task: message_queue.put(task) def on_close(self): '''客户端关闭时被调用''' print("<<< %s disconnected" % str(self.request.remote_ip)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Queue.Empty: continue message_queue.task_done() def check_origin(self, origin): '''容许WebSocket的跨域请求''' return True class Task(object): def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args): self.uuid = uuid self.offset_top = int(float(offset_top)) self.image_data = image_data self.channel = channel self.zoom = zoom self.rType = rType self.responseType = responseType @classmethod def create(clz, message,sel): # data = message.get('text') data = message try: params = json.loads(data[:150]) image_data = data[150:] image_data = image_data.replace(" ", "+") params['image_data'] = image_data params['channel'] = sel # add Type if params.get('responseType') is None: params['responseType'] = 'url' # request type if params.get('rType') is None: params['rType'] = 'rl' task = Task(**params) except ValueError as e: task = None print(">>>message data error!") print(e) return task def __cmp__(self, other): return cmp(self.offset_top, other.offset_top) def verify_auth_token(message): '''token 验证''' token = message.get('query_string') secret_key = 'aoiakai' try: payload = jwt.decode(token, secret_key, algorithms=['HS256']) if payload.get('ip') != message.get('remote_ip'): return False, 'ip mismatch' except jwt.ExpiredSignatureError as e: print(e) return False, 'token expired' except Exception as e: print(e) return False, 'enter correct token' return True, '' work_thread = Thread(target=work_loop) work_thread.daemon = True work_thread.start()