spring websocket集群问题的简单记录

前言

最近公司里遇到一个问题,在集群中一些websocket的消息丢失了。
产生问题的原理很简单,发送消息的服务和接收者链接的服务不是同一个服务。java

解决方案

用中间件(mq, redis etc.)来在服务之间进行通讯。web

不直接发送websocket消息,而是将消息放在mq或者redis的list中。
并在redis中维护链接信息,服务根据链接信息来判断本身是否须要处理消息,或者将消息发给接收者链接的服务。redis

代码示例

咱们的项目中使用的是Spring WebSocket,而且使用了STOMP协议,能够去官网查看文档。spring

代码示例只作维护链接信息的代码示例,其余部分就不放上来了。apache

维护链接信息的代码示例

想要在维护STOMP协议的链接信息,能够查看文档的这一部分Listening To ApplicationContext Events and Intercepting Messageswebsocket

这里的链接信息只要是可以标识出不一样的服务就OK。app

一下是监听了订阅事件的Listener的部分代码:socket

package cn.fjhdtp.websocket.interceptor;

import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
            WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            //握手前,往attributes中增长所需信息


        Object loginBean = ...;//获取登陆的用户信息(或其余信息)
        attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);

        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}
package cn.fjhdtp.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;

import java.util.Map;

@Component
public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {

    @Autowired
    @Qualifier("serversideMessageTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Autowired
    private IMessageHandler messageHandler;

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) {
            //获取订阅的destination
          String destination = (String) event.getMessage().getHeaders().get("simpDestination");
            //获取登陆信息
            Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
            //TODO 向redis中增长链接信息
    }
}
package cn.fjhdtp.message.listener;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;


import java.util.Map;

@Component
public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {

    @Override
    public void onApplicationEvent(SessionDisconnectEvent event) {
        // stomp链接断开,清除链接信息
        //从attributes中获取登陆信息(或其余信息)
        Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);

        //从redis中移除链接信息
    }
}

固然,有些状况下可能不会正常的触发断开链接的事件(在was下就不会有这个事件),所以还会须要HeartBeat。ide

相关文章
相关标签/搜索