如下是个人场景描述前端
- 资源:4台服务器。其中只有一台服务器具有ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)
- 应用发布限制条件:因为场景须要,应用场所须要ssl认证的域名才能发布。所以ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)链接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用
- 需求:用户登陆应用,须要与服务器创建wss链接,不一样角色之间能够单发消息,也能够群发消息
- 集群中的应用服务类型:每一个集群实例都负责http无状态请求服务与ws长链接服务
在个人实现里,每一个应用服务器都负责http and ws请求,其实也能够将ws请求创建的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差很少,但从实现方便性来讲,一个应用服务http+ws请求的方式更为方便。下文会有解释java
- Eureka 服务发现与注册
- Redis Session共享
- Redis 消息订阅
- Spring Boot
- Zuul 网关
- Spring Cloud Gateway 网关
- Spring WebSocket 处理长链接
- Ribbon 负载均衡
- Netty 多协议NIO网络通讯框架
- Consistent Hash 一致性哈希算法
相信能走到这一步的人都了解过我上面列举的技术栈了,若是尚未,能够先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认你们都了解过了...
这里是描述一致性Hash算法最易懂的文章传送门mysql
下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案
WebSocketSession与HttpSession
在Spring所集成的WebSocket里面,每一个ws链接都有一个对应的session:WebSocketSession
,在Spring WebSocket中,咱们创建ws链接以后能够经过相似这样的方式进行与客户端的通讯:webprotected void handleTextMessage(WebSocketSession session, TextMessage message) { System.out.println("服务器接收到的消息: "+ message ); //send message to client session.sendMessage(new TextMessage("message")); }那么问题来了:ws的session没法序列化到redis,所以在集群中,咱们没法将全部
WebSocketSession
都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession
,redis能够支持httpsession共享,可是目前没有websocket session共享的方案,所以走redis websocket session共享这条路是行不通的。
有的人可能会想:我可不能够将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息而后从新构建websocket session...我只想说这种方法若是有人能试出来,请告诉我一声...redis
以上即是websocket session与http session共享的区别,总的来讲就是http session共享已经有解决方案了,并且很简单,只要引入相关依赖:spring-session-data-redis
和spring-boot-starter-redis
,你们能够从网上找个demo玩一下就知道怎么作了。而websocket session共享的方案因为websocket底层实现的方式,咱们没法作到真正的websocket session共享。算法
刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并无websocket session这样的概念,与其相似的是channel
,每个客户端链接都表明一个channel。前端的ws请求经过netty监听的端口,走websocket协议进行ws握手链接以后,经过一些列的handler(责链模式)进行消息处理。与websocket session相似地,服务端在链接创建后有一个channel,咱们能够经过channel进行与客户端的通讯spring
/** * TODO 根据服务器传进来的id,分配到不一样的group */ private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //retain增长引用计数,防止接下来的调用引用失效 System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text()); //将消息发送给group里面的全部channel,也就是发送消息给客户端 GROUP.writeAndFlush(msg.retain()); }
那么,服务端用netty仍是用spring websocket?如下我将从几个方面列举这两种实现方式的优缺点sql
玩过netty的人都知道netty是的线程模型是nio模型,并发量很是高,spring5以前的网络线程模型是servlet实现的,而servlet不是nio模型,因此在spring5以后,spring的底层网络实现采用了netty。若是咱们单独使用netty来开发websocket服务端,速度快是绝对的,可是可能会遇到下列问题:
1.与系统的其余应用集成不方便,在rpc调用的时候,没法享受springcloud里feign服务调用的便利性
2.业务逻辑可能要重复实现
3.使用netty可能须要重复造轮子
4.怎么链接上服务注册中心,也是一件麻烦的事情
5.restful服务与ws服务须要分开实现,若是在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信不少人都习惯了。api
spring websocket已经被springboot很好地集成了,因此在springboot上开发ws服务很是方便,作法很是简单
第一步:添加依赖缓存
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
第二步:添加配置类
@Configuration public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*"); } @Bean public WebSocketHandler myHandler() { return new MessageHandler(); } }
第三步:实现消息监听类
@Component @SuppressWarnings("unchecked") public class MessageHandler extends TextWebSocketHandler { private List<WebSocketSession> clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println("链接创建: " + session.getId()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println("断开链接: " + session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map<String, String> map = JSONObject.parseObject(payload, HashMap.class); System.out.println("接受到的数据" + map); clients.forEach(s -> { try { System.out.println("发送消息给: " + session.getId()); s.sendMessage(new TextMessage("服务器返回收到的信息," + payload)); } catch (Exception e) { e.printStackTrace(); } }); } }
从这个demo中,使用spring websocket实现ws服务的便利性你们可想而知了。为了能更好地向spring cloud你们族看齐,我最终采用了spring websocket实现ws服务。
所以个人应用服务架构是这样子的:一个应用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是由于拆分出去要使用feign来进行服务调用。第一本人比较懒惰,第二拆分与不拆分相差在多了一层服务间的io调用,因此就没有这么作了。
要实现websocket集群,咱们必不可免地得从zuul转型到spring cloud gateway。缘由以下:
zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,可是2.0版本没有被spring boot集成,并且文档不健全。所以转型是必须的,同时转型也很容易实现。
在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中如下的某些配置是必须的,在这里提早避免你们采坑 server: port: 443 ssl: enabled: true key-store: classpath:xxx.jks key-store-password: xxxx key-store-type: JKS key-alias: alias spring: application: name: api-gateway cloud: gateway: httpclient: ssl: handshake-timeout-millis: 10000 close-notify-flush-timeout-millis: 3000 close-notify-read-timeout-millis: 0 useInsecureTrustManager: true discovery: locator: enabled: true lower-case-service-id: true routes: - id: dc uri: lb://dc predicates: - Path=/dc/** - id: wecheck uri: lb://wecheck predicates: - Path=/wecheck/**
若是要愉快地玩https卸载,咱们还须要配置一个filter,不然请求网关时会出现错误not an SSL/TLS record
@Component public class HttpsToHttpFilter implements GlobalFilter, Ordered { private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI originalUri = exchange.getRequest().getURI(); ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String forwardedUri = request.getURI().toString(); if (forwardedUri != null && forwardedUri.startsWith("https")) { try { URI mutatedUri = new URI("http", originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), originalUri.getPath(), originalUri.getQuery(), originalUri.getFragment()); mutate.uri(mutatedUri); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } ServerHttpRequest build = mutate.build(); ServerWebExchange webExchange = exchange.mutate().request(build).build(); return chain.filter(webExchange); } @Override public int getOrder() { return HTTPS_TO_HTTP_FILTER_ORDER; }}
这样子咱们就可使用gateway来卸载https请求了,到目前为止,咱们的基本框架已经搭建完毕,网关既能够转发https请求,也能够转发wss请求。接下来就是用户多对多之间session互通的通信解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。
这是最简单的websocket集群通信解决方案。场景以下:
教师A想要群发消息给他的学生们
session广播实现很简单,可是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,至关于浪费了一次循环遍历的计算力,该方案在并发需求不高的状况下能够优先考虑,实现很容易。
spring cloud中获取服务集群中每台服务器信息的方法以下
@Resource private EurekaClient eurekaClient; Application app = eurekaClient.getApplication("service-name"); //instanceInfo包括了一台服务器ip,port等消息 InstanceInfo instanceInfo = app.getInstances().get(0); System.out.println("ip address: " + instanceInfo.getIPAddr());
服务器须要维护关系映射表,将用户的id与session作映射,session创建时在映射表中添加映射关系,session断开后要删除映射表内关联关系
(本文的要点)
这种方法是本人认为最优雅的实现方案,理解这种方案须要必定的时间,若是你耐心看下去,相信你必定会有所收获。再强调一次,不了解一致性哈希算法的同窗请先看这里,现先假设哈希环是顺时针查找的。
首先,想要将一致性哈希算法的思想应用到咱们的websocket集群,咱们须要解决如下新问题:
在集群中,总会出现服务UP/DOWN的问题。
针对节点DOWN的问题分析以下:
一个服务器DOWN的时候,其拥有的websocket session会自动关闭链接,而且前端会收到通知。此时会影响到哈希环的映射错误。咱们只须要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。
实现方法:在eureka治理中心监听集群服务DOWN事件,并及时更新哈希环。
针对节点UP的问题分析以下:
现假设集群中有服务CacheB
上线了,该服务器的ip地址恰好被映射到key1和cacheA
之间。那么key1对应的用户每次要发消息时都跑去CacheB
发送消息,结果明显是发送不了消息,由于CacheB
没有key1对应的session。
此时咱们有两种解决方案。
方案A简单,动做大:
eureka监听到节点UP事件以后,根据现有集群信息,更新哈希环。而且断开全部session链接,让客户端从新链接,此时客户端会链接到更新后的哈希环节点,以此避免消息没法送达的状况。
方案B复杂,动做小:
咱们先看看没有虚拟节点的状况,假设CacheC
和CacheA
之间上线了服务器CacheB
。全部映射在CacheC
到CacheB
的用户发消息时都会去CacheB
里面找session发消息。也就是说CacheB
一但上线,便会影响到CacheC
到CacheB
之间的用户发送消息。因此咱们只须要将CacheA
断开CacheC
到CacheB
的用户所对应的session,让客户端重连。
接下来是有虚拟节点的状况,假设浅色的节点是虚拟节点。咱们用长括号来表明某段区域映射的结果属于某个
Cache
。首先是C节点未上线的状况。图你们应该都懂吧,全部B的虚拟节点都会指向真实的B节点,因此全部B节点逆时针那一部分都会映射到B(由于咱们规定哈希环顺时针查找)。
接下来是C节点上线的状况,能够看到某些区域被C占领了。
由以上状况咱们能够知道:节点上线,会有许多对应虚拟节点也同时上线,所以咱们须要将多段范围key对应的session断开链接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,你们能够尝试一下本身实现算法。
哈希环应该放在哪里?
- gateway本地建立并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但其实是不太可取的,回想一下上面服务器DOWN的时候只能经过eureka监听,那么eureka监听到DOWN事件以后,须要经过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么作。
- eureka建立,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽可能地短,咱们不可让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的几率的确很低,gateway只须要应用redis的消息订阅模式,订阅哈希环修改事件即可以解决此问题。
至此咱们的spring websocket集群已经搭建的差很少了,最重要的地方仍是一致性哈希算法。如今有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon做为负载均衡,咱们只须要根据创建ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程以下图所示:
接下来用户沟通的时候,只须要根据id进行hash,在哈希环上获取对应ip,即可以知道与该用户创建ws链接时的session存在哪台服务器上了!
题主在实际操做的时候发现了ribbon两个不完善的地方......
AbstractLoadBalancerRule
重写负载均衡策略以后,多个不一样应用的请求变得混乱。假如eureka上有两个service A和B,重写负载均衡策略以后,请求A或B的服务,最终只会映射到其中一个服务上。很是奇怪!可能spring cloud gateway官网须要给出一个正确的重写负载均衡策略的demo。default
!难道这样子咱们就没有办法了吗?其实还有一个可行而且暂时可替代的办法!
以下图所示,客户端发送一个普通的http请求(包含id参数)给网关,网关根据id进行hash,在哈希环中寻找ip地址,将ip地址返回给客户端,客户端再根据该ip地址进行ws请求。
因为ribbon未完善key的处理,咱们暂时没法在ribbon上实现一致性哈希算法。只能间接地经过客户端发起两次请求(一次http,一次ws)的方式来实现一致性哈希。但愿不久以后ribbon能更新这个缺陷!让咱们的websocket集群实现得更优雅一点。
以上即是我这几天探索的结果。期间遇到了许多问题,并逐一解决难题,列出两个websocket集群解决方案。第一个是session广播,第二个是一致性哈希。这两种方案针对不一样场景各有优缺点,本文并未用到ActiveMQ,Karfa等消息队列实现消息推送,只是想经过本身的想法,不依靠消息队列来简单地实现多用户之间的长链接通信。但愿能为你们提供一条不一样于寻常的思路。