Django使用Channels实现WebSocket--下篇

但愿经过对这两篇文章的学习,可以对Channels有更加深刻的了解,使用起来驾轻就熟游刃有余html

经过上一篇《Django使用Channels实现WebSocket--上篇》的学习应该对Channels的各类概念有了清晰的认知,能够顺利的将Channels框架集成到本身的Django项目中实现WebSocket了,本篇文章将以一个Channels+Celery实现web端tailf功能的例子更加深刻的介绍Channels前端

先说下咱们要实现的目标:全部登陆的用户能够查看tailf日志页面,在页面上可以选择日志文件进行监听,多个页面终端同时监放任何日志都互不影响,页面同时提供终止监听的按钮可以终止前端的输出以及后台对日志文件的读取python

最终实现的结果见下图linux

接着咱们来看下具体的实现过程web

技术实现

全部代码均基于如下软件版本:数据库

  • python==3.6.3
  • django==2.2
  • channels==2.1.7
  • celery==4.3.0

celery4在windows下支持不完善,因此请在linux下运行测试django

日志数据定义

咱们只但愿用户可以查询固定的几个日志文件,就不是用数据库仅借助settings.py文件里写全局变量来实现数据存储json

在settings.py里添加一个叫TAILF的变量,类型为字典,key标识文件的编号,value标识文件的路径windows

TAILF = {
    1: '/ops/coffee/error.log',
    2: '/ops/coffee/access.log',
}

基础Web页面搭建

假设你已经建立好了一个叫tailf的app,并添加到了settings.py的INSTALLED_APPS中,app的目录结构大概以下后端

tailf
    - migrations
        - __init__.py
    - __init__.py
    - admin.py
    - apps.py
    - models.py
    - tests.py
    - views.py

依然先构建一个标准的Django页面,相关代码以下

url:

from django.urls import path
from django.contrib.auth.views import LoginView,LogoutView

from tailf.views import tailf

urlpatterns = [
    path('tailf', tailf, name='tailf-url'),

    path('login', LoginView.as_view(template_name='login.html'), name='login-url'),
    path('logout', LogoutView.as_view(template_name='login.html'), name='logout-url'),
]

由于咱们规定只有经过登陆的用户才能查看日志,因此引入Django自带的LoginView,logoutView帮助咱们快速构建Login,Logout功能

指定了登陆模板使用login.html,它就是一个标准的登陆页面,post传入username和password两个参数便可,不贴代码了

view:

from django.conf import settings
from django.shortcuts import render
from django.contrib.auth.decorators import login_required


# Create your views here.
@login_required(login_url='/login')
def tailf(request):
    logDict = settings.TAILF
    return render(request, 'tailf/index.html', {"logDict": logDict})

引入了login_required装饰器,来判断用户是否登陆,未登陆就给跳到/login登陆页面

logDict 去setting里取咱们定义好的TAILF字典赋值,并传递给前端

template:

{% extends "base.html" %}

{% block content %}
<div class="col-sm-8">
  <select class="form-control" id="file">
    <option value="">选择要监听的日志</option>
    {% for k,v in logDict.items %}
    <option value="{{ k }}">{{ v }}</option>
    {% endfor %}
  </select>
</div>
<div class="col-sm-2">
  <input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/>
</div>
<div class="col-sm-2">
  <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/>
</div>
<div class="col-sm-12">
  <textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
</div>
{% endblock %}

前端拿到TAILF后经过循环的方式填充到select选择框下,由于数据是字典格式,使用logDict.items的方式能够循环出字典的key和value

这样一个日志监听页面就完成了,但还没法实现日志的监听,继续往下

集成Channels实现WebSocket

日志监听功能主要的设计思路就是页面跟后端服务器创建websocket长链接,后端经过celery异步执行while循环不断的读取日志文件而后发送到websocket的channel里,实现页面上的实时显示

接着咱们来集成channels

  1. 先添加routing路由,直接修改webapp/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from django.urls import path, re_path
from chat.consumers import ChatConsumer
from tailf.consumers import TailfConsumer

application = ProtocolTypeRouter({
    'websocket': AuthMiddlewareStack(
        URLRouter([
            path('ws/chat/', ChatConsumer),
            re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer),
        ])
    )
})

直接将路由信息写入到了URLRouter里,注意路由信息的外层多了一个list,区别于上一篇中介绍的写路由文件路径的方式

页面须要将监听的日志文件传递给后端,咱们使用routing正则P<id>\d+传文件ID给后端程序,后端程序拿到ID以后根据settings中指定的TAILF解析出日志路径

routing的写法跟Django中的url写法彻底一致,使用re_path匹配正则routing路由

  1. 添加consumer在tailf/consumers.py文件中
import json
from channels.generic.websocket import WebsocketConsumer
from tailf.tasks import tailf


class TailfConsumer(WebsocketConsumer):
    def connect(self):
        self.file_id = self.scope["url_route"]["kwargs"]["id"]

        self.result = tailf.delay(self.file_id, self.channel_name)

        print('connect:', self.channel_name, self.result.id)
        self.accept()

    def disconnect(self, close_code):
        # 停止执行中的Task
        self.result.revoke(terminate=True)
        print('disconnect:', self.file_id, self.channel_name)

    def send_message(self, event):
        self.send(text_data=json.dumps({
            "message": event["message"]
        }))

这里使用Channels的单通道模式,每个新链接都会启用一个新的channel,彼此互不影响,能够随意终止任何一个监听日志的请求

connect

咱们知道self.scope相似于Django中的request,记录了丰富的请求信息,经过self.scope["url_route"]["kwargs"]["id"]取出routing中正则匹配的日志ID

而后将idchannel_name传递给celery的任务函数tailf,tailf根据id取到日志文件的路径,而后循环文件,将新内容根据channel_name写入对应channel

disconnect

当websocket链接断开的时候咱们须要终止Celery的Task执行,以清除celery的资源占用

终止Celery任务使用到revoke指令,采用以下代码来实现

self.result.revoke(terminate=True)

注意self.result是一个result对象,而非id

参数terminate=True的意思是是否当即终止Task,为True时不管Task是否正在执行都当即终止,为False(默认)时须要等待Task运行结束以后才会终止,咱们使用了While循环不设置为True就永远不会终止了

终止Celery任务的另一种方法是:

from webapp.celery import app
app.control.revoke(result.id, terminate=True)

send_message

方便咱们经过Django的view或者Celery的task调用给channel发送消息,官方也比较推荐这种方式

使用Celery异步循环读取日志

上边已经集成了Channels实现了WebSocket,但connect函数中的celery任务tailf尚未实现,下边来实现它

关于Celery的详细内容能够看这篇文章:《Django配置Celery执行异步任务和定时任务》,本文就不介绍集成使用以及细节原理,只讲一下任务task

task实现代码以下:

from __future__ import absolute_import
from celery import shared_task

import time
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.conf import settings


@shared_task
def tailf(id, channel_name):
    channel_layer = get_channel_layer()
    filename = settings.TAILF[int(id)]

    try:
        with open(filename) as f:
            f.seek(0, 2)

            while True:
                line = f.readline()

                if line:
                    print(channel_name, line)
                    async_to_sync(channel_layer.send)(
                        channel_name,
                        {
                            "type": "send.message",
                            "message": "微信公众号【运维咖啡吧】原创 版权全部 " + str(line)
                        }
                    )
                else:
                    time.sleep(0.5)
    except Exception as e:
        print(e)

这里边主要涉及到Channels中另外一个很是重要的点:从Channels的外部发送消息给Channel

其实上篇文章中检查通道层是否可以正常工做的时候使用的方法就是从外部给Channel通道发消息的示例,本文的具体代码以下

async_to_sync(channel_layer.send)(
    channel_name,
    {
        "type": "send.message",
        "message": "微信公众号【运维咖啡吧】原创 版权全部 " + str(line)
    }
)

channel_name 对应于传递给这个任务的channel_name,发送消息给这个名字的channel

type 对应于咱们Channels的TailfConsumer类中的send_message方法,将方法中的_换成.便可

message 就是要发送给这个channel的具体信息

上边是发送给单Channel的状况,若是是须要发送到Group的话须要使用以下代码

async_to_sync(channel_layer.group_send)(
    group_name,
    {
        'type': 'chat.message',
        'message': '欢迎关注公众号【运维咖啡吧】'
    }
)

只须要将发送单channel的send改成group_sendchannel_name改成group_name便可

须要特别注意的是:使用了channel layer以后必定要经过async_to_sync来异步执行

页面添加WebSocket支持

后端功能都已经完成,咱们最后须要添加前端页面支持WebSocket

function connect() {
    if ( $('#file').val() ) {
      window.chatSocket = new WebSocket(
        'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/');

      chatSocket.onmessage = function(e) {
        var data = JSON.parse(e.data);
        var message = data['message'];
        document.querySelector('#chat-log').value += (message);
        // 跳转到页面底部
        $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
      };

      chatSocket.onerror = function(e) {
        toastr.error('服务端链接异常!')
      };

      chatSocket.onclose = function(e) {
        toastr.error('websocket已关闭!')
      };
    } else {
      toastr.warning('请选择要监听的日志文件')
    }
  }

上一篇文章中有详细介绍过websocket的消息类型,这里很少介绍了

至此咱们一个日志监听页面完成了,包含了完整的监听功能,但还没法终止,接着看下面的内容

Web页面主动断开WebSocket

web页面上“终止监听”按钮的主要逻辑就是触发WebSocket的onclose方法,从而能够触发Channels后端consumer的disconnect方法,进而终止Celery的循环读取日志任务

前端页面经过.close()能够直接触发WebSocket关闭,固然你若是直接关掉页面的话也会触发WebSocket的onclose消息,因此不用担忧Celery任务没法结束的问题

function goclose() {
    console.log(window.chatSocket);

    window.chatSocket.close();
    window.chatSocket.onclose = function(e) {
      toastr.success('已终止日志监听!')
    };
  }

至此咱们包含完善功能的Tailf日志监听、终止页面就所有完成了

写在最后

两篇文章结束不知道你是否对Channels有了更深一步的了解,可以操刀上手将Channels用在本身的项目中,实现理想的功能。我的以为Channels的重点和难点在于对channel layer的理解和运用,真正的理解了并能熟练运用,相信你必定可以触类旁通完美实现更多需求。最后若是对本文的demo源码感兴趣能够关注微信公众号【运维咖啡吧】后台回复小二加我微信向我索取,必定有求必应


相关文章推荐阅读:

相关文章
相关标签/搜索