聊聊分布式下的WebSocket解决方案

前言java


最近本身搭建了个项目,项目自己很简单,可是里面有使用WebSocket进行消息提醒的功能,大致状况是这样的。web

发布消息者在系统中发送消息,实时的把消息推送给对应的一个部门下的全部人。json

这里面若是是单机应用的状况时,咱们能够经过部门的id和用户的id组成一个惟一的key,与应用服务器创建WebSocket长链接,而后就能够接收到发布消息者发送的消息了。后端

可是真正把项目应用于生产环境中时,咱们是不可能就部署一个单机应用的,而是要部署一个集群。浏览器

因此我经过Nginx+两台Tomcat搭建了一个简单的负载均衡集群,做为测试使用服务器

可是问题出现了,咱们的客户端浏览器只会与一台服务器创建WebSocket长链接,因此发布消息者在发送消息时,就无法保证全部目标部门的人都能接收到消息(由于这些人链接的可能不是一个服务器)。websocket

本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,固然解决方案不止一种,那咱们开始吧。session

WebSocket单体应用介绍架构


在介绍分布式集群以前,咱们先来看一下王子的WebSocket代码实现,先来看java后端代码以下:app

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/webSocket/{key}")
public class WebSocket {

private static int onlineCount = 0;
/**
 * 存储链接的客户端
 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
private Session session;
/**
 * 发送的目标科室code
 */
private String key;

@OnOpen
public void onOpen(@PathParam("key") String key, Session session) throws IOException {
    this.key = key;
    this.session = session;
    if (!clients.containsKey(key)) {
        addOnlineCount();
    }
    clients.put(key, this);
    Log.info(key+"已链接消息服务!");
}

@OnClose
public void onClose() throws IOException {
    clients.remove(key);
    subOnlineCount();
}

@OnMessage
public void onMessage(String message) throws IOException {
    if(message.equals("ping")){
        return ;
    }
    JSONObject jsonTo = JSON.parseObject(message);
    String mes = (String) jsonTo.get("message");
    if (!jsonTo.get("to").equals("All")){
        sendMessageTo(mes, jsonTo.get("to").toString());
    }else{
        sendMessageAll(mes);
    }
}

@OnError
public void onError(Session session, Throwable error) {
    error.printStackTrace();
}

private void sendMessageTo(String message, String To) throws IOException {
    for (WebSocket item : clients.values()) {
        if (item.key.contains(To) )
            item.session.getAsyncRemote().sendText(message);
    }
}

private void sendMessageAll(String message) throws IOException {
    for (WebSocket item : clients.values()) {
        item.session.getAsyncRemote().sendText(message);
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount;
}

public static synchronized void addOnlineCount() {
    WebSocket.onlineCount++;
}

public static synchronized void subOnlineCount() {
    WebSocket.onlineCount--;
}

public static synchronized Map<String, WebSocket> getClients() {
    return clients;
}

}

示例代码中并无使用Spring,用的是原生的java web编写的,简单和你们介绍一下里面的方法。

onOpen:在客户端与WebSocket服务链接时触发方法执行

onClose:在客户端与WebSocket链接断开的时候触发执行

onMessage:在接收到客户端发送的消息时触发执行

onError:在发生错误时触发执行

能够看到,在onMessage方法中,咱们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。

再来看一下js代码

var host = document.location.host;

// 得到当前登陆科室
var deptCodes='${sessionScope.$UserContext.departmentID}';
deptCodes=deptCodes.replace(/[[|]|s]+/g, "");
var key = '${sessionScope.$UserContext.userID}'+deptCodes;
var lockReconnect = false;  //避免ws重复链接
var ws = null;          // 判断当前浏览器是否支持WebSocket
var wsUrl = 'ws://' + host + '/webSocket/'+ key;
createWebSocket(wsUrl);   //链接ws

function createWebSocket(url) {
    try{
        if('WebSocket' in window){
            ws = new WebSocket(url);
        }else if('MozWebSocket' in window){  
            ws = new MozWebSocket(url);
        }else{
              layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10如下浏览器,360浏览器请使用极速模式,不要使用兼容模式!"); 
        }
        initEventHandle();
    }catch(e){
        reconnect(url);
        console.log(e);
    }     
}

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
        console.log("llws链接关闭!"+new Date().toUTCString());
    };
    ws.onerror = function () {
        reconnect(wsUrl);
        console.log("llws链接错误!");
    };
    ws.onopen = function () {
        heartCheck.reset().start();      //心跳检测重置
        console.log("llws链接成功!"+new Date().toUTCString());
    };
    ws.onmessage = function (event) {    //若是获取到消息,心跳检测重置
        heartCheck.reset().start();      //拿到任何消息都说明当前链接是正常的//接收到消息实际业务处理

        ...

};
}
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket链接,防止链接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
    ws.close();
}  

function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    setTimeout(function () {     //没链接上会一直重连,设置延迟避免请求过多
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}

//心跳检测
var heartCheck = {
    timeout: 300000,        //5分钟发一次心跳
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明链接正常
            ws.send("ping");
            console.log("ping!")
            self.serverTimeoutObj = setTimeout(function(){//若是超过必定时间还没重置,说明后端主动断开了
                ws.close();     //若是onclose会执行reconnect,咱们执行ws.close()就好了.若是直接执行reconnect 会触发onclose致使重连两次
            }, self.timeout)
        }, this.timeout)
    }

  }

js部分使用的是原生H5编写的,若是为了更好的兼容浏览器,也可使用SockJS,有兴趣小伙伴们能够自行百度。

接下来咱们就手动的优化代码,实现WebSocket对分布式架构的支持。

解决方案的思考


如今咱们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何可以解决这个问题了。

咱们先来看一看发生这个问题的根本缘由是什么。

简单思考一下就能明白,单体应用下只有一台服务器,全部的客户端链接的都是这一台消息服务器,因此当发布消息者发送消息时,全部的客户端其实已经所有与这台服务器创建了链接,直接群发消息就能够了。

换成分布式系统后,假如咱们有两台消息服务器,那么客户端经过Nginx负载均衡后,就会有一部分链接到其中一台服务器,另外一部分链接到另外一台服务器,因此发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就能够执行群发操做,但问题是,另外一台服务器并不知道这件事,也就没法发送消息了。

如今咱们知道了根本缘由是生产消息时,只有一台消息服务器可以感知到,因此咱们只要让另外一台消息服务器也能感知到就能够了,这样感知到以后,它就能够群发消息给链接到它上边的客户端了。

那么什么方法能够实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知全部消息服务器就能够了。

引入RabbitMQ解决分布式下的WebSocket问题


在消息中间件的选择上,王子选择了RabbitMQ,缘由是它的搭建比较简单,功能也很强大,并且咱们只是用到它群发消息的功能。

RabbitMQ有一个广播模式(fanout),咱们使用的就是这种模式。

首先咱们写一个RabbitMQ的链接类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil {

private static Connection connection;

/**
 * 与rabbitmq创建链接
 * @return
 */
public static Connection getConnection() {
    if (connection != null&&connection.isOpen()) {
        return connection;
    }

    ConnectionFactory factory = new ConnectionFactory();
    factory.setVirtualHost("/");
    factory.setHost("192.168.220.110"); // 用的是虚拟IP地址
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");

    try {
        connection = factory.newConnection();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

    return connection;
}

}

这个类没什么说的,就是获取MQ链接的一个工厂类。

而后按照咱们的思路,就是每次服务器启动的时候,都会建立一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,以下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class InitListener implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
    WebSocket.init();
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {

}

}

记得要在Web.xml中配置监听器信息

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
     version="4.0">
<listener>
    <listener-class>InitListener</listener-class>
</listener>

</web-app>

WebSocket中增长init方法,做为MQ消费者部分

public static void init() {

try {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
        //获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
        channel.queueBind(queueName,"fanoutLogs","");


        //这里重写了DefaultConsumer的handleDelivery方法,由于发送的时候对消息进行了getByte(),在这里要从新组装成String
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println(message);

            //这里可使用WebSocket经过消息内容发送消息给对应的客户端

}
        };

        //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
        channel.basicConsume(queueName,true,consumer);
        //这里不能关闭链接,调用了消费方法后,消费者会一直链接着rabbitMQ等待消费
    } catch (IOException e) {
        e.printStackTrace();
    }
}

同时在接收到消息时,不是直接经过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样若是消息服务器有多个,就都会从MQ中得到消息,以后经过获取的消息内容再使用WebSocket推送给对应的客户端就能够了。

WebSocket的onMessage方法增长内容以下:

try {

//尝试获取一个链接
        Connection connection = RabbitMQUtil.getConnection();
        //尝试建立一个channel
        Channel channel = connection.createChannel();
        //声明交换机(参数为:交换机名称; 交换机类型,广播模式)
        channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
        //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型便可)
        channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
        System.out.println("发布消息");
        channel.close();
    } catch (IOException |TimeoutException e) {
        e.printStackTrace();
    }

增长后删除掉原来的Websocket推送部分代码。

这样一整套的解决方案就完成了。

总结


到这里,咱们就解决了分布式下WebSocket的推送消息问题。

咱们主要是引入了RabbitMQ,经过RabbitMQ的发布订阅模式,让每一个消息服务器启动的时候都去订阅消息,而不管哪台消息服务器在发送消息的时候都会发送给MQ,这样每台消息服务器就都会感知到发送消息的事件,从而再经过Websocket发送给客户端。

大致流程就是这样,那么小伙伴们有没有想过,若是RabbitMQ挂掉了几分钟,以后重启了,消费者是否能够从新链接到RabbitMQ?是否还能正常接收消息呢?

生产环境下,这个问题是必须考虑的。

这里已经测试过,消费者是支持自动重连的,因此咱们能够放心的使用这套架构来解决此问题。

本文到这里就结束了,欢迎各位小伙伴点赞文章留言讨论,一块儿学习,一块儿进步。

相关文章
相关标签/搜索