原先谭总(他不让咱们叫老谭,他说他还小。。。不知道他哪里“小”,开个玩笑哈)的tio社区版是不带集群功能的,虽然大部分状况下已经能知足要求了,可是你们仍是很关心集群方案,对于新手来讲最好别太复杂,那就基于redis来作一个发布/订阅的方式达到多节点协做。java
基于简单原则,就不考虑代码或者结构方面的修理了,直接基于谭总的websocket-showcase来改改。node
大概原理相对来讲好理解,相似以下图git
客户端和服务端节点经过代理服务器来分发,服务端节点全部的接收到的消息均发布到redis指定频道,而后各个服务器节点去订阅此频道的消息,这样即便客户端不在同一个服务器节点均能接收到订阅消息。web
实际上,以最简单的需求来讲,各个节点之间只须要知道谁在线,谁离开,消息发送时可以到达对方,有这三个便可知足最简单的集群了,因此就这么干了。redis
先预览下代码修改处(社会我人傻话很少,直接上代码吧):apache
其中pojo包是封装用的bean,封装用户和消息体:json
import java.io.Serializable; import java.util.Set; /** * 用户模型 * * @author huanglin */ public class User implements Serializable { private String username; private Set<String> group; /** * 所在节点,只是为了方便后期的操做,暂时没用上 */ private String node; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public Set<String> getGroup() { return group; } public void setGroup(Set<String> group) { this.group = group; } public String getNode() { return node; } public void setNode(String node) { this.node = node; } }
import java.io.Serializable; /** * 消息bean对象. * * @author : huanglin * @version : 1.0 * @since :2018/5/10 上午9:36 */ public class Msg implements Serializable { private int action; private String msg; private String from; private String to; private String status; public int getAction() { return action; } public void setAction(int action) { this.action = action; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } }
processor包是将握手后onAfterHandshaked、断开链接onBeforeClose、接收到文本onText消息时的抽象处理(多节点时要用到),其中DefaultServerProcessor只是将原来老谭的代码原本来本放回去,不作任何改动;ServerProcessorOnPubSub则是将这几个动做经过redis的发布订阅完成多节点协做(也算是集群了)消息互通。api
抽象接口ServerProcessor:服务器
import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; /** * 主要的操做抽象 * * @author huanglin */ public interface ServerProcessor { /** * 当关闭前作通知 * * @param channelContext * @param throwable * @param remark * @param isRemove * @throws Exception */ void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception; /** * 握手成功后的通知 * * @param httpRequest * @param httpResponse * @param channelContext * @throws Exception */ void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception; /** * 收到文本信息时的通知操做 * * @param wsRequest * @param text * @param channelContext * @return * @throws Exception */ Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception; }
默认实现DefaultServerProcessor:websocket
import net.hlin.wss.server.Const; import net.hlin.wss.server.ShowcaseServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import org.tio.websocket.common.WsResponse; import org.tio.websocket.common.WsSessionContext; import java.util.Objects; /** * 原来的showcase里面的东西不懂 * @author huanglin */ public class DefaultServerProcessor implements ServerProcessor { private static Logger log = LoggerFactory.getLogger(DefaultServerProcessor.class); @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { if (log.isInfoEnabled()) { log.info("onBeforeClose\r\n{}", channelContext); } WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute(); if (wsSessionContext.isHandshaked()) { int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size(); String msg = channelContext.getClientNode().toString() + " 离开了,如今共有【" + count + "】人在线"; //用tio-websocket,服务器发送到客户端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //群发 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); } } @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { //绑定到群组,后面会有群发 Aio.bindGroup(channelContext, Const.GROUP_ID); int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size(); String msg = channelContext.getClientNode().toString() + " 进来了,如今共有【" + count + "】人在线"; //用tio-websocket,服务器发送到客户端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //群发 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); } @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute(); HttpRequest httpRequest = wsSessionContext.getHandshakeRequestPacket();//获取websocket握手包 if (log.isDebugEnabled()) { log.debug("握手包:{}", httpRequest); } log.info("收到ws消息:{}", text); if (Objects.equals("心跳内容", text)) { return null; } String msg = channelContext.getClientNode().toString() + " 说:" + text; //用tio-websocket,服务器发送到客户端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //群发 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); //返回值是要发送给客户端的内容,通常都是返回null return null; } }
基于redis的发布订阅实现:
import cn.hutool.core.io.resource.ResourceUtil; import com.alibaba.fastjson.JSON; import net.hlin.wss.server.Const; import net.hlin.wss.server.pojo.Msg; import net.hlin.wss.server.pojo.User; import net.hlin.wss.server.util.MsgUtil; import org.redisson.Redisson; import org.redisson.api.RBucket; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import java.io.IOException; public class ServerProcessorOnPubSub implements ServerProcessor { private static Logger log = LoggerFactory.getLogger(ServerProcessorOnPubSub.class); private RedissonClient client; private RTopic<Msg> topic; public ServerProcessorOnPubSub() { try { Config config = Config.fromJSON(ResourceUtil.getStream("classpath:redisson.json")); client = Redisson.create(config); topic = client.getTopic(Const.WS_MSG_TOPIC_CHANNEL); subcribeMsg(); } catch (IOException e) { e.printStackTrace(); } } @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { String username = channelContext.getUserid(); //TODO 如查询当前用户所在组的功能 //Set<String> groups = userService.getUserGroups(username); // for 循环 :Aio.bindGroup(channelContext, group); //无论以前是否已经登陆,直接覆盖,实际业务时会有具体处理 User user = new User(); // user.setGroup(groups); user.setUsername(username); user.setNode(channelContext.getServerNode().toString()); RBucket<User> userRBucket = client.getBucket(Const.WS_USER_PREFIX + username); userRBucket.set(user); log.info("用户{}加入", username); } @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { String username = channelContext.getUserid(); client.getBucket(Const.WS_USER_PREFIX + username).delete(); log.info("用户{}离开", username); } @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { Msg msg = JSON.parseObject(text, Msg.class); topic.publish(msg); return null; } private void subcribeMsg() { topic.addListener(new MessageListener<Msg>() { @Override public void onMessage(String channel, Msg msg) { int action = msg.getAction(); Msg respMsg = new Msg(); //响应信息则直接返回给客户端便可 if (action % 11 == 0 && MsgUtil.existsUser(msg.getTo())) { //从新包装下后再发送 respMsg.setMsg(msg.getMsg()); respMsg.setAction(msg.getAction()); respMsg.setStatus(msg.getStatus()); MsgUtil.sendToUser(msg.getTo(), respMsg); } else { respMsg.setTo(msg.getFrom()); respMsg.setStatus("200"); if (action == Const.Action.P2P_MSG_REQ.val()) { respMsg.setAction(Const.Action.P2P_MSG_RESP.val()); if (MsgUtil.existsUser(msg.getTo())) { MsgUtil.sendToUser(msg.getTo(), msg); topic.publish(respMsg); } } else if (action == Const.Action.GROUP_MSG_REQ.val()) { MsgUtil.sendToGroup(msg.getTo(), msg); respMsg.setAction(Const.Action.GROUP_MSG_RESP.val()); topic.publish(respMsg); } } } }); } }
其中工具包中的MsgUtil是封装了Aio的部分功能,简化代码目的:
import cn.hutool.json.JSONUtil; import net.hlin.wss.server.ShowcaseServerConfig; import net.hlin.wss.server.pojo.Msg; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.utils.lock.SetWithLock; import org.tio.websocket.common.WsResponse; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 聊天工具类. * * @author : huanglin * @version : 1.0 * @since :2018/5/8 上午11:23 */ public class MsgUtil { public static boolean existsUser(String userId) { SetWithLock<ChannelContext> set = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId); if(set == null || set.size() < 1) { return false; } return true; } /** * 发送到指定用户 * @param userId * @param message */ public static void sendToUser(String userId, Msg message) { SetWithLock<ChannelContext> toChannleContexts = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId); if(toChannleContexts == null || toChannleContexts.size() < 1) { return; } ReentrantReadWriteLock.ReadLock readLock = toChannleContexts.getLock().readLock(); readLock.lock(); try{ Set<ChannelContext> channels = toChannleContexts.getObj(); for(ChannelContext channelContext : channels){ send(channelContext, message); } }finally{ readLock.unlock(); } } /** * 功能描述:[发送到群组(全部不一样协议端)] * @param group * @param msg */ public static void sendToGroup(String group, Msg msg){ if(msg == null) { return; } SetWithLock<ChannelContext> withLockChannels = Aio.getChannelContextsByGroup(ShowcaseServerConfig.groupContext, group); if(withLockChannels == null) { return; } ReentrantReadWriteLock.ReadLock readLock = withLockChannels.getLock().readLock(); readLock.lock(); try{ Set<ChannelContext> channels = withLockChannels.getObj(); if(channels != null && channels.size() > 0){ for(ChannelContext channelContext : channels){ send(channelContext,msg); } } }finally{ readLock.unlock(); } } /** * 发送到指定通道; * @param channelContext * @param msg */ public static void send(ChannelContext channelContext,Msg msg){ if(channelContext == null) { return; } WsResponse response = WsResponse.fromText(JSONUtil.toJsonStr(msg), ShowcaseServerConfig.CHARSET); Aio.sendToId(channelContext.getGroupContext(), channelContext.getId(), response); } }
其余几个用红色箭头的代码表示在原来的基础上有所改动。
常量类Const:
/** * @author tanyaowu * @modify huanglin */ public class Const { /** * 用于群聊的group id */ public static final String GROUP_ID = "showcase-websocket"; public static final String WS_MSG_TOPIC_CHANNEL = "WS_MSG_TOPIC_CHANNEL"; public static final String WS_USER_PREFIX = "WS_USER_PREFIX:"; /** * 客户端和服务端的交互动做枚举 */ public enum Action { /** * 点对点消息请求 */ P2P_MSG_REQ(3), /** * 点对点消息响应 */ P2P_MSG_RESP(33), /** * 群组消息请求 */ GROUP_MSG_REQ(4), /** * 群组消息响应 */ GROUP_MSG_RESP(44); private int action; Action(int action) { this.action = action; } public int val(){ return this.action; } } }
ShowcaseIpStatListener无变化,这里就不列了
ShowcaseServerAioListener:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; import org.tio.websocket.server.WsServerAioListener; /** * @author tanyaowu * 用户根据状况来完成该类的实现 * @modify huanglin */ public class ShowcaseServerAioListener extends WsServerAioListener { private static Logger log = LoggerFactory.getLogger(ShowcaseServerAioListener.class); public static final ShowcaseServerAioListener me = new ShowcaseServerAioListener(); private ShowcaseServerAioListener() { } @Override public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception { super.onAfterConnected(channelContext, isConnected, isReconnect); if (log.isInfoEnabled()) { log.info("onAfterConnected\r\n{}", channelContext); } } @Override public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception { super.onAfterSent(channelContext, packet, isSentSuccess); if (log.isInfoEnabled()) { log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext); } } @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { super.onBeforeClose(channelContext, throwable, remark, isRemove); ShowcaseServerConfig.processor.onBeforeClose(channelContext, throwable, remark, isRemove); } @Override public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception { super.onAfterDecoded(channelContext, packet, packetSize); if (log.isInfoEnabled()) { log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext); } } @Override public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception { super.onAfterReceivedBytes(channelContext, receivedBytes); if (log.isInfoEnabled()) { log.info("onAfterReceivedBytes\r\n{}", channelContext); } } @Override public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception { super.onAfterHandled(channelContext, packet, cost); if (log.isInfoEnabled()) { log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext); } } }
ShowcaseServerConfig:
增长部分
/** * 若是使用DefaultServerProcessor就是单节点,shiyong ServerProcessorOnPubSub就能集群了 */ public static ServerProcessor processor; /** * 给MsgUtil hold住实例,直接调用 */ public static ServerGroupContext groupContext;
完整代码:
import net.hlin.wss.server.processor.ServerProcessor; import org.tio.server.ServerGroupContext; import org.tio.utils.time.Time; /** * @author tanyaowu * @modify Huang lin * */ public abstract class ShowcaseServerConfig { /** * 协议名字(能够随便取,主要用于开发人员辨识) */ public static final String PROTOCOL_NAME = "showcase"; public static final String CHARSET = "utf-8"; /** * 监听的ip null表示监听全部,并不指定ip */ public static final String SERVER_IP = null; /** * 监听端口 */ public static final int SERVER_PORT = 9326; /** * 心跳超时时间,单位:毫秒 */ public static final int HEARTBEAT_TIMEOUT = 1000 * 60; /** * 若是使用DefaultServerProcessor就是单节点,shiyong ServerProcessorOnPubSub就能集群了 */ public static ServerProcessor processor; /** * 给MsgUtil hold住实例,直接调用 */ public static ServerGroupContext groupContext; /** * ip数据监控统计,时间段 * @author tanyaowu * */ public interface IpStatDuration { Long DURATION_1 = Time.MINUTE_1 * 5; Long[] IP_STAT_DURATIONS = new Long[] { DURATION_1 }; } }
ShowcaseWebsocketStarter:
import java.io.IOException; import net.hlin.wss.server.processor.ServerProcessorOnPubSub; import org.apache.commons.lang3.StringUtils; import org.tio.core.ssl.SslConfig; import org.tio.server.ServerGroupContext; import org.tio.websocket.server.WsServerStarter; /** * @author tanyaowu * 2017年6月28日 下午5:34:04 */ public class ShowcaseWebsocketStarter { private WsServerStarter wsServerStarter; private ServerGroupContext serverGroupContext; /** * * @author tanyaowu */ public ShowcaseWebsocketStarter(int port, ShowcaseWsMsgHandler wsMsgHandler) throws Exception { wsServerStarter = new WsServerStarter(port, wsMsgHandler); serverGroupContext = wsServerStarter.getServerGroupContext(); serverGroupContext.setName(ShowcaseServerConfig.PROTOCOL_NAME); serverGroupContext.setServerAioListener(ShowcaseServerAioListener.me); //设置ip统计时间段 serverGroupContext.ipStats.addDurations(ShowcaseServerConfig.IpStatDuration.IP_STAT_DURATIONS); //设置ip监控 serverGroupContext.setIpStatListener(ShowcaseIpStatListener.me); //设置心跳超时时间 serverGroupContext.setHeartbeatTimeout(ShowcaseServerConfig.HEARTBEAT_TIMEOUT); //若是你但愿经过wss来访问,就加上下面这一行吧,不过首先你得有证书哦 //initSsl(serverGroupContext); } private static void initSsl(ServerGroupContext serverGroupContext) throws Exception { String keyStoreFile = "classpath:config/ssl/keystore.jks"; String trustStoreFile = "classpath:config/ssl/keystore.jks"; String keyStorePwd = "214323428310224"; if (StringUtils.isNotBlank(keyStoreFile) && StringUtils.isNotBlank(trustStoreFile)) { SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd); serverGroupContext.setSslConfig(sslConfig); } } /** * @author tanyaowu * @throws IOException */ public static void start() throws Exception { ShowcaseWebsocketStarter appStarter = new ShowcaseWebsocketStarter(ShowcaseServerConfig.SERVER_PORT, ShowcaseWsMsgHandler.me); ShowcaseServerConfig.processor = new ServerProcessorOnPubSub(); ShowcaseServerConfig.groupContext = appStarter.getServerGroupContext(); appStarter.wsServerStarter.start(); } /** * @return the serverGroupContext */ public ServerGroupContext getServerGroupContext() { return serverGroupContext; } public WsServerStarter getWsServerStarter() { return wsServerStarter; } public static void main(String[] args) throws Exception { start(); } }
ShowcaseWsMsgHandler:
import cn.hutool.core.util.StrUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import org.tio.websocket.server.handler.IWsMsgHandler; /** * @author tanyaowu * 2017年6月28日 下午5:32:38 */ public class ShowcaseWsMsgHandler implements IWsMsgHandler { private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class); public static ShowcaseWsMsgHandler me = new ShowcaseWsMsgHandler(); private ShowcaseWsMsgHandler() { } /** * 握手时走这个方法,业务能够在这里获取cookie,request参数等 */ @Override public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { String username = request.getParam("username"); //不作合法性处理了,根据具体业务来处理就行了 if (StrUtil.isNotEmpty(username)) { Aio.bindUser(channelContext, username); return httpResponse; } return null; } /** * @param httpRequest * @param httpResponse * @param channelContext * @throws Exception * @author tanyaowu */ @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { ShowcaseServerConfig.processor.onAfterHandshaked(httpRequest, httpResponse, channelContext); } /** * 字节消息(binaryType = arraybuffer)过来后会走这个方法 */ @Override public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception { return null; } /** * 当客户端发close flag时,会走这个方法 */ @Override public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception { Aio.remove(channelContext, "receive close flag"); return null; } /** * 字符消息(binaryType = blob)过来后会走这个方法 */ @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { return ShowcaseServerConfig.processor.onText(wsRequest, text, channelContext); } }
最后来看看运行结果(启用了两个端口分别启动了2个服务器节点):
源码请移步:https://gitee.com/hlinwork/tio-websocket-showcase