最近公司里遇到一个问题,在集群中一些websocket的消息丢失了。
产生问题的原理很简单,发送消息的服务和接收者链接的服务不是同一个服务。java
用中间件(mq, redis etc.)来在服务之间进行通讯。web
不直接发送websocket消息,而是将消息放在mq或者redis的list中。
并在redis中维护链接信息,服务根据链接信息来判断本身是否须要处理消息,或者将消息发给接收者链接的服务。redis
咱们的项目中使用的是Spring WebSocket,而且使用了STOMP协议,能够去官网查看文档。spring
代码示例只作维护链接信息的代码示例,其余部分就不放上来了。apache
想要在维护STOMP协议的链接信息,能够查看文档的这一部分Listening To ApplicationContext Events and Intercepting Messageswebsocket
这里的链接信息只要是可以标识出不一样的服务就OK。app
一下是监听了订阅事件的Listener的部分代码:socket
package cn.fjhdtp.websocket.interceptor; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { //握手前,往attributes中增长所需信息 Object loginBean = ...;//获取登陆的用户信息(或其余信息) attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean); return super.beforeHandshake(request, response, wsHandler, attributes); } }
package cn.fjhdtp.listener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionSubscribeEvent; import java.util.Map; @Component public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> { @Autowired @Qualifier("serversideMessageTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private IMessageHandler messageHandler; @Override public void onApplicationEvent(SessionSubscribeEvent event) { //获取订阅的destination String destination = (String) event.getMessage().getHeaders().get("simpDestination"); //获取登陆信息 Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN); //TODO 向redis中增长链接信息 } }
package cn.fjhdtp.message.listener; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionDisconnectEvent; import java.util.Map; @Component public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> { @Override public void onApplicationEvent(SessionDisconnectEvent event) { // stomp链接断开,清除链接信息 //从attributes中获取登陆信息(或其余信息) Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN); //从redis中移除链接信息 } }
固然,有些状况下可能不会正常的触发断开链接的事件(在was下就不会有这个事件),所以还会须要HeartBeat。ide