WebSocket在现代浏览器中的应用已经算是比较广泛了,在某些业务场景下,要求必须可以在服务器端推送消息至客户端。在没有WebSocket的年代,咱们使用过dwr,在那个时候dwr真实一个很是棒的方案。可是在WebSocket兴起以后,咱们更愿意使用标准实现来解决问题、html
首先交代一下,本篇文章不讲解WebSocket的配置,主要讲的是针对在微服务架构集群模式下解决方案的选择。前端
微服务架构你们应该都不陌生了,在微服务架构下,服务是分布式的,并且为了保证业务的可用性,每一个服务都是以集群的形式存在。在集群模式下,要保证集群的每个节点的访问获得相同的结果就须要作到数据一致性,如缓存、session等。web
微服务集群缓存一般使用分布式缓存redis解决,session一致性也一般会经过redis解决,可是如今更流行的是无状态的Http,即无session化,最多见的解决方案就是OAuth。redis
WebSocket有所不一样,它是与服务端创建一个长链接,在集群模式下,显然不可能把前端与服务集群中的每个节点创建链接,一个可行的思路是像解决http session的共享同样,经过redis来实现websocket的session共享,可是websocket session的数量是远多于http session的数量的(由于每打开一个页面都会创建一个websocket链接),因此随着用户量的增加,共享的数据量太大,很容易形成瓶颈。spring
另外一个思路是,websocket总归会与集群中某个节点创建链接,那么,只要找到链接所在的节点,就能够向服务端推送消息了,那么要解决的问题就是如何找到一个websocket链接所在的节点。要找到链接在哪一个节点上,咱们须要一个惟一的标识符用于寻找链接,然而在基于stomp的发布-订阅模式下,一个消息的推送多是面向若干个链接的,可能分布在集群中的每个节点上,这样去寻找链接的代价也很高。既然这样,咱们不妨换种思路,每个websocket消息,咱们在集群的每一个节点上都进行推送,订阅了该消息的链接,无论有一个仍是一万个,最终确定都能收到这个消息。基于这个思路,咱们作了一些技术选型:浏览器
RabbitMQ缓存
Spring Cloud Streambash
首先说RabbitMQ,高级消息队列,能够实现消息广播(固然kafka同样能够作到,这里只介绍一种),另外一项技术是Spring Cloud Stream,stream是一个用于构建高度可扩展事件驱动型微服务的框架,而且它能够跟RabbitMQ、Kafka以及其余多种消息服务集成,使用了stream,要把rabbitmq换成kafka只不过是改改配置的事情。接下来重点介绍使用方法:服务器
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码
binder是stream中的重要概念,是用于配置用于stream发布和订阅事件的消息中间件。先看一段配置:websocket
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
username: username
password: password
virtual-host: /
复制代码
配置中的 defaultRabbit 是binder的名称,一会会在其余配置中引用,type指定了消息中间件的类型,environment是对消息中间件的配置,这里的配置结构和spring.rabbitmq命名空间下的配置项如出一辙的,能够参照着进行配置(这样配置的做用是能够把stream的rabbitmq配置和项目中其余地方使用的rabbitmq区分开,若是这里不配置environment,binder会沿用spring.rabbitmq命名空间下的配置),好比你的项目中的rabbitmq的配置是这样的:
spring:
rabbitmq:
host: localhost
username: username
password: password
virtual-host: /
复制代码
那上门的binder的environment配置彻底能够去掉。
微服务要接收挥着发布事件消息,根据spring cloud stream的名字,顾名思义,须要使用流,因此须要在配置中声明两个事件流,一个输入流,一个输出流:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
destination: websocketMessage
binder: defaultRabbit
websocketMessageOut:
destination: websocketMessage
binder: defaultRabbit
复制代码
这里咱们看到,事件流引用了binder,表示这两个流使用rabbitmq这个中间件(看到这里想必你们已经明白了,在一个项目中彻底能够同时使用rabbit和kafka做为事件流的消息中间件)。
websocketMessageIn,websocketMessageOut是事件流的名字(能够本身随便起),destination指定了两个事件流的destination是同一个,这决定了写入和读取是指向同一个地方(不必定是同一个消息队列)。
事件流使用接口进行定义:
/** * websocket消息事件流接口 * Created by 吴昊 on 18-11-8. * * @author 吴昊 * @since 1.4.3 */
interface WebSocketMessageStream {
companion object {
const val INPUT: String = "webSocketMessageIn"
const val OUTPUT: String = "webSocketMessageOut"
}
/** * 输入 */
@Input(INPUT)
fun input(): SubscribableChannel
/** * 输出 */
@Output(OUTPUT)
fun output(): MessageChannel
}
复制代码
声明事件流接口,这里面定义了两个常量,分别对应配置中的两个流名称,经过调用input()方法获取输入流,经过调用output()获取输出流。
该接口的实现由spring cloud stream完成,不须要本身实现。
声明一个bean:
@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……
复制代码
这里的@EnableBinding 注解指明了事件流接口类,只有添加了这个注解(要能被Spring识别到,能够加在入口类上,也能够加在@Configuration注解的类上),该接口才会被实现,而且加入到Spring的容器中(能够注入)。
上面WebSocketMessageService的内容以下:
@Autowired
private lateinit var stream: WebSocketMessageStream
@Autowired
private lateinit var template: SimpMessagingTemplate
@StreamListener(WebSocketMessageStream.INPUT)
fun messageReceived(message: WebSocketMessage) {
template.convertAndSend(message.destination, message.body)
}
fun send(destination: String, body: Any) {
stream.output().send(
MutableMessage(WebSocketMessage(destination, body))
)
}
复制代码
@StreamListener 注解指明了要监听的事件流,方法接收的参数即事件的消息内容(使用jackson反序列化),这里的messageReceived方法直接将接收到的消息直接用websocket发送给前端
一样,发送也很简单,将消息直接发送到输入流中,上面的send方法便是将本来应该用SimpMessagingTemplate发送给websocket的消息发送到spring cloud stream的事件流中。这样作之后,项目中全部须要向前端推送webSocket消息的操做都应该调用send方法来进行。
讲到这里你们可能还有点糊涂,也有一些疑问,为何这样每一个微服务节点就能收到事件消息了?或者单个节点接收事件消息和多个节点接收的配置是怎么控制的。各位不要着急,待我慢慢道来,接下来就要结合rabbit的知识来说解 了:
首先看一下rabbit的消息队列:
从图中看到,存在多个以webSocketMessage开头的队列,这是每个微服务节点建立了一个消息队列,再来看exchange:
这里的exchange名称和上面消息队列的名称前缀均是webSocketMessage, 这个都是由前面的binding配置中的destination指定的,和destination名称保持一致
当应用向输入流中写入事件时,使用destination做为key(即webSocketMessage),将消息写入名为webSocketMessage的exchange,因为exchange绑定的消息队列前缀均为webSocketMessage且routing key都是#,因此exchange会将消息路由到每个webSocketMessage开头的消息队列上(这里涉及到rabbitmq的知识点,如过不懂请自行查阅资料),这样每个微服务都能接收到相同的消息。
咱们再来看前面提出的问题,这样的配置能够把消息推送到每个微服务节点,那么若是须要一个消息只被一个节点接收,该怎么配置呢?很简单,一个配置项就能够搞定:
spring:
cloud:
stream:
bindings:
websocketMessageIn:
group: test
destination: websocketMessage
binder: defaultRabbit
复制代码
能够看到,相比前面的配置,仅仅多了一个group的配置,这样配置以后,rabbitmq会生成一个名为websocketMessage.test的消息队列(前面讲到的每一个微服务创建的消息队列是自动删除的,即微服务断开链接后消息队列就被删除,而这个消息队列是持久化的,也就是即便全部的微服务节点所有断开链接也不会被删除),全部的微服务节点监听这一个队列,当队列中有消息时,只会被一个节点消费。
要讲的内容到此结束,spring cloud stream的配置远不止这些,可是这些配置已足够完成我所须要作的事情,其余的配置请参考spring cloud stream官方文档: