Django+Vue实现WebSocket链接

Django+Vue实现WebSocket链接
近期有一需求:前端页面点击执行任务,实时显示后端执行状况,思考一波;发现WebSocket最适合作这件事。前端

效果
测试ping www.baidu.com效果vue

点击链接创建ws链接web

socket.gifredis

后端实现
所需软件包shell

后端主要借助DjangoChannels实现socket链接,官网文档连接django

这里想实现每一个链接进来加入组进行广播,因此还须要引入channels-redis。后端

pipwebsocket

channels2.2.0
channels-redis
2.4.0
引入session

settings.py多线程

INSTALLED_APPS = [
‘django.contrib.admin’,
‘django.contrib.auth’,
‘django.contrib.contenttypes’,
‘django.contrib.sessions’,
‘django.contrib.messages’,
‘django.contrib.staticfiles’,
‘rest_framework.authtoken’,
‘rest_framework’,

‘channels’,
]

Redis配置

REDIS_HOST = ENV_DICT.get(‘REDIS_HOST’, ‘127.0.0.1’)
REDIS_PORT = ENV_DICT.get(‘REDIS_PORT’, 6379)
CHANNEL_LAYERS = {
“default”: {
“BACKEND”: “channels_redis.core.RedisChannelLayer”,
“CONFIG”: {
“hosts”: [(REDIS_HOST, REDIS_PORT)],
},
},
}
代码

apps/consumers.py

新建一个消费处理

实现: 默认链接加入组,发送信息时的处理。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
def connect(self):
“”"
每一个任务做为一个频道
默认进入对应任务执行频道
“”"
self.job_name = self.scope[‘url_route’][‘kwargs’][‘job_name’]
self.job_group_name = ‘job_%s’ % self.job_name
async_to_sync(self.channel_layer.group_add)(
self.job_group_name,
self.channel_name
)
self.accept()

def disconnect(self, close_code):
    async_to_sync(self.channel_layer.group_discard)(
        self.job_group_name,
        self.channel_name
    )

# job.message类型处理
def job_message(self, event):

    # 默认发送收到信息
    self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由MyConsumer去处理。

from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
‘websocket’: SessionMiddlewareStack(
URLRouter(
[
path(‘ws/job/str:job_name’, consumers.MyConsumer)
]
)
),
})
apps/views.py

在执行命令中获取webSocket消费通道,进行异步推送

使用异步推送async_to_sync是由于在链接的时候采用的异步链接,因此推送必须采用异步推送。
由于执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。
from subprocess import Popen,PIPE
import threading

def runPopen(job):
“”"
执行命令,返回popen
“”"
path = os.path
Path = path.abspath(path.join(BASE_DIR, path.pardir))
script_path = path.abspath(path.join(Path,‘run.sh’))
cmd = “sh %s %s” % (script_path, job)
return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

def runScript(job):
channel_layer = get_channel_layer()
group_name = “job_%s” % job

popen = runPopen(job)
while True:
    output = popen.stdout.readline()
    if output == '' and popen.poll() is not None:
        break

    if output:
        output_text = str(output.strip())
        async_to_sync(
            channel_layer.group_send
            )(
                group_name, 
                {"type": "job.message", "text": output_text}
            )
    else:
        err  = popen.stderr.readline()
        err_text = str(err.strip())
        async_to_sync(
            channel_layer.group_send
            )(
                group_name,
                {"type": "job.message", "text": err_text}
            )
        break

class StartJob(APIView):
def get(self, request, job=None):
run = threading.Thread(target=runScript, args=(job,))
run.start()
return HttpResponse(‘ok’)
apps/urls.py

get请求就能启动任务

urlpatterns = [

path(‘start_job/str:job’, StartJob.as_view())
]
前端实现
所需软件包

vue-native-websocket
代码实现

plugins/vueNativeWebsocket.js

import Vue from ‘vue’
import VueNativeSock from ‘…/utils/socket/Main.js’

export default function ({ store }) {
Vue.use(VueNativeSock, ‘http://localhost:8000/ws/job’, {connectManually: true,});
}
nuxt.config.js

配置文件引入, 这里我使用的是nuxt框架

plugins: [
{
src: ‘@/plugins/vueNativeWebsocket.js’,
***: false
},
],
封装socket

export default (connection_url, option) => {
// 事件
let event = [‘message’, ‘close’, ‘error’, ‘open’];

// 拷贝选项字典
let opts = Object.assign({}, option);

// 定义实例字典
let instance = {

  // socket实例
  socket: '',

  // 是否链接状态
  is_conncet: false,

  // 具体链接方法
  connect: function() {
    if(connection_url) {
      let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
      connection_url = scheme + '://' + connection_url.split('://')[1];
      this.socket = new WebSocket(connection_url);
      this.initEvent();
    }else{
      console.log('wsurl為空');
    }
  },

  // 初始化事件
  initEvent: function() {
    for(let i = 0; i < event.length; i++){
      this.addListener(event[i]);
    }
  },

  // 判断事件
  addListener: function(event) {
    this.socket.addEventListener(event, (e) => {
      switch(event){
        case 'open':
          this.is_conncet = true;
          break;
        case 'close':
          this.is_conncet = false;
          break;
      }
      typeof opts[event] == 'function' && opts[event](e);
    });
  },

  // 发送方法,失败则回调
  send: function(data, closeCallback) {
    console.log('socket ---> ' + data)
    if(this.socket.readyState >= 2) {
      console.log('ws已经关闭');
      closeCallback && closeCallback();
    }else{
      this.socket.send(data);
    }
  }

};

// 调用链接方法
instance.connect();
return instance;

}
index.vue

具体代码

x2Str方法,由于后端返回的是bytes,格式b’xxx’,编写了方法对其进行转换。

<el-button type="primary" @click="runFunction" >执行</el-button>
            <el-button type="primary"  @click="connectWebSock" >显示</el-button>

<div class="socketView">
  <span v-for="i in socketMessage" :key="i">{{i}}</span>
</div>

} 至此,实现先后端websocket通信。