书接上文,咱们开始对咱们的小小聊天室进行集群化改造。html
上文地址:前端
[WebSocket入门]手把手搭建WebSocket多人在线聊天室(SpringBoot+WebSocket)java
本文内容摘要:git
本文源码:(妈妈不再用担忧我没法复现文章代码啦)github
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版web
若是您以为这个教程对您有用,请关注个人技术公众号:Rude3Knife,不定时更新技术点滴。redis
分布式就是为了解决单点故障问题,想象一下,若是一个服务器承载了1000个大佬同时聊天,服务器忽然挂了,1000个大佬瞬间所有掉线,大概明天你就被大佬们吊起来打了。算法
当聊天室改成集群后,就算服务器A挂了,服务器B上聊天的大佬们还能够愉快的聊天,而且在前端还能经过代码,让链接A的大佬们快速重连至存活的服务器B,继续和你们愉快的聊天,岂不美哉!spring
总结一下:实现了分布式WebSocket后,咱们能够将流量负载均衡到不一样的服务器上并提供一种通讯机制让各个服务器能进行消息同步(否则用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都无法收到)。json
当咱们要实现分布式的时候,咱们则须要在各个机器上共享这些信息,因此咱们须要一个Publish/Subscribe的中间件。咱们如今使用Redis做为咱们的解决方案。
假设咱们的聊天室集群有服务器A和B,用户Alice链接在A上,Bob链接在B上、
Alice向聊天室的服务器A发送消息,A服务器必需要将收到的消息转发到Redis,才能保证聊天室集群的全部服务器(也就是A和B)可以拿到消息。不然,只有Alice在的服务器A可以读到消息,用户Bob在的服务器B并不能收到消息,A和B也就没法聊天了。
说完了发送消息,那么如何保证Alice发的消息,其余全部人都能收到呢,前面咱们知道了Alice发送的消息已经被传到了Redis的频道,那么全部服务器都必须订阅这个Redis频道,而后把这个频道的消息转发到本身的用户那里,这样本身服务器所管辖的用户就能收到消息。
上期咱们搭建了个websocket聊天室demo,而且使用了STOMP协议,可是我并无介绍到底什么是STOMP协议,同窗们会有疑惑,这里对于STOMP有很好地总结:
当直接使用WebSocket时(或SockJS)就很相似于使用TCP套接字来编写Web应用。由于没有高层级的线路协议(wire protocol),所以就须要咱们定义应用之间所发送消息的语义,还须要确保链接的两端都能遵循这些语义。
就像HTTP在TCP套接字之上添加了请求-响应模型层同样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。
与HTTP请求和响应相似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,以下就是发送数据的一个STOMP帧:
>>> SEND transaction:tx-0 destination:/app/marco content-length:20 {"message":"Marco!"}
好了,介绍完了概念,让咱们开始动手改造!
若是你不熟悉Redis的sub/pub(订阅/发布)功能,请看这里进行简单了解它的用法,很简单:
https://redisbook.readthedocs.io/en/latest/feature/pubsub.html
在咱们上篇文章的Demo基础上,咱们进行集群改造。上一篇文章的源码见下方:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/单机版
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
固然首先要确保你安装了Redis,windows下安装redis比较麻烦,你能够搜索redis-for-windows下载安装。
# redis 链接配置 spring.redis.database=0 spring.redis.host=127.0.0.1 spring.redis.password= spring.redis.port=6379 spring.redis.ssl=false # 空闲链接最大数 spring.redis.jedis.pool.max-idle=10 # 获取链接最大等待时间(s) spring.redis.jedis.pool.max-wait=60000
# Redis定义 redis.channel.msgToAll = websocket.msgToAll
package cn.monitor4all.springbootwebsocketdemo.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; import java.net.Inet4Address; import java.net.InetAddress; /** * Redis订阅频道属性类 * @author yangzhendong01 */ @Component public class RedisListenerBean { private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class); @Value("${server.port}") private String serverPort; @Value("${redis.channel.msgToAll}") private String msgToAll; /** * redis消息监听器容器 * 能够添加多个监听不一样话题的redis监听器,只须要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 经过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 监听msgToAll container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll)); LOGGER.info("Subscribed Redis channel: " + msgToAll); return container; } }
能够看到,咱们在代码里监听了redis频道msgToAll,这个是在application.properties定义的,固然若是你懒得定义,这里能够写死。
咱们单机聊天室的发送消息Controller是这样的:
@MessageMapping("/chat.sendMessage") @SendTo("/topic/public") public ChatMessage sendMessage(@Payload ChatMessage chatMessage) { return chatMessage;
咱们前端发给咱们消息后,直接给/topic/public转发这个消息,让其余用户收到。
在集群中,咱们须要把消息转发给Redis,而且不转发给前端,而是让服务端监听Redis消息,在进行消息发送。
将Controller改成:
@Value("${redis.channel.msgToAll}") private String msgToAll; @Autowired private RedisTemplate<String, String> redisTemplate; @MessageMapping("/chat.sendMessage") public void sendMessage(@Payload ChatMessage chatMessage) { try { redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage)); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
你会发现咱们在代码中使用了JsonUtil将实体类ChatMessage转为了Json发送给了Redis,这个Json工具类须要使用到FaskJson依赖:
<!-- json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency>
package cn.monitor4all.springbootwebsocketdemo.util; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * JSON 转换 */ public final class JsonUtil { private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class); /** * 把Java对象转换成json字符串 * * @param object 待转化为JSON字符串的Java对象 * @return json 串 or null */ public static String parseObjToJson(Object object) { String string = null; try { string = JSONObject.toJSONString(object); } catch (Exception e) { LOGGER.error(e.getMessage()); } return string; } /** * 将Json字符串信息转换成对应的Java对象 * * @param json json字符串对象 * @param c 对应的类型 */ public static <T> T parseJsonToObj(String json, Class<T> c) { try { JSONObject jsonObject = JSON.parseObject(json); return JSON.toJavaObject(jsonObject, c); } catch (Exception e) { LOGGER.error(e.getMessage()); } return null; } }
这样,咱们接收到用户发送消息的请求时,就将消息转发给了redis的频道websocket.msgToAll
单机的聊天室,咱们接收消息是经过Controller直接把消息转发到全部人的频道上,这样就能在全部人的聊天框显示。
在集群中,咱们须要服务器把消息从Redis中拿出来,而且推送到本身管的用户那边,咱们在Service层实现消息的推送。
咱们在service实现发送,须要使用上述第二种方法。
新建类service/ChatService:
package cn.monitor4all.springbootwebsocketdemo.service; import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.stereotype.Service; @Service public class ChatService { private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class); @Autowired private SimpMessageSendingOperations simpMessageSendingOperations; public void sendMsg(@Payload ChatMessage chatMessage) { LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString()); simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage); } }
咱们在哪里调用这个service呢,咱们须要在监听到消息后调用,因此咱们就要有下面的Redis监听消息处理专用类
新建类redis/RedisListenerHandle:
package cn.monitor4all.springbootwebsocketdemo.redis; import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage; import cn.monitor4all.springbootwebsocketdemo.service.ChatService; import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * Redis订阅频道处理类 * @author yangzhendong01 */ @Component public class RedisListenerHandle extends MessageListenerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class); @Value("${redis.channel.msgToAll}") private String msgToAll; @Value("${server.port}") private String serverPort; @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private ChatService chatService; /** * 收到监听消息 * @param message * @param bytes */ @Override public void onMessage(Message message, byte[] bytes) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); String rawMsg; String topic; try { rawMsg = redisTemplate.getStringSerializer().deserialize(body); topic = redisTemplate.getStringSerializer().deserialize(channel); LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg); } catch (Exception e) { LOGGER.error(e.getMessage(), e); return; } if (msgToAll.equals(topic)) { LOGGER.info("Send message to all users:" + rawMsg); ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class); // 发送消息给全部在线Cid chatService.sendMsg(chatMessage); } else { LOGGER.warn("No further operation with this topic!"); } } }
这样,咱们的改造就基本完成了!咱们看一下效果
咱们将服务器运行在8080上,而后打开localhost:8080,起名Alice进入聊天室
随后,咱们在application.properties中将端口server.port=8081
再次运行程序(别忘了开启IDEA的“容许启动多个并行服务”设置,否则会覆盖掉你的8080服务,以下图),在8081启动一个聊天室,起名Bob进入聊天室。
以下两图,咱们已经能够在不一样端口的两个聊天室,互相聊天了!(注意看url)
在互相发送消息是,咱们还可使用命令行监听下Redis的频道websocket.msgToAll,能够看到双方传送的消息。以下图:
咱们还能够打开Chrome的F12控制台,查看前端的控制台发送消息的log,以下图:
大功告成了吗?
功能实现了,可是并不完美!你会发现,Bob的加入并无提醒Bob进入了聊天室(在单机版是有的),这是由于咱们在“加入聊天室”的代码尚未修改,在加入时,只有Bob的服务器B里的其余用户知道Bob加入了聊天室。咱们还能再进一步!
咱们须要弥补上面的不足,将用户上线下线的广播发送到全部服务器上。
此外,我还但愿之后可以查询集群中全部的在线用户,咱们在redis中添加一个set,来保存用户名,这样就能够随时获得在线用户的数量和名称。
# Redis定义 redis.channel.userStatus = websocket.userStatus redis.set.onlineUsers = websocket.onlineUsers
咱们增长两个定义
第一个是新增redis频道websocket.userStatus用来广播用户上下线消息
第二个是redis的set,用来保存在线用户信息
container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));
public void alertUserStatus(@Payload ChatMessage chatMessage) { LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString()); simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage); }
在service中咱们向本服务器的用户广播消息,用户上线或者下线的消息都经过这里传达。
@MessageMapping("/chat.addUser") public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) { LOGGER.info("User added in Chatroom:" + chatMessage.getSender()); try { headerAccessor.getSessionAttributes().put("username", chatMessage.getSender()); redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender()); redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage)); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
咱们修改了addUser方法,在这里往redis中广播用户上线的消息,并把用户名username写入redis的set中(websocket.onlineUsers)
@EventListener public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage()); String username = (String) headerAccessor.getSessionAttributes().get("username"); if(username != null) { LOGGER.info("User Disconnected : " + username); ChatMessage chatMessage = new ChatMessage(); chatMessage.setType(ChatMessage.MessageType.LEAVE); chatMessage.setSender(username); try { redisTemplate.opsForSet().remove(onlineUsers, username); redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage)); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } }
在用户关闭网页时,websocket会调用该方法,咱们在这里须要把用户从redis的在线用户set里删除,而且向集群发送广播,说明该用户退出聊天室。
else if (userStatus.equals(topic)) { ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class); if (chatMessage != null) { chatService.alertUserStatus(chatMessage); }
在监听类中咱们接受了来自userStatus频道的消息,并调用service
此外,咱们还能够在Reids中查询到用户信息:
有了这两篇文章的基础, 咱们固然还能实现如下的功能:
感兴趣的同窗能够本身试试看。
深刻浅出Websocket(二)分布式Websocket集群
https://juejin.im/post/6844903584929153032
Spring消息之STOMP:
http://www.javashuo.com/article/p-mugrohor-nt.html
咱们在本文中把单机版的聊天室改成了分布式聊天室,大大提升了聊天室可用性。
本文工程源代码:
单机版:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/单机版
集群版:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版
若是您以为这个教程对您有用,请关注个人技术公众号:Rude3Knife,不定时更新技术点滴。
我目前是一名后端开发工程师。主要关注后端开发,数据安全,爬虫,边缘计算等方向。
微信:yangzd1102(请注明来意)
Github:@qqxx6661
我的博客:
若是文章对你有帮助,不妨收藏起来并转发给您的朋友们~