阅读对象:本文适合SpringBoot 初学者及对SpringBoot感兴趣的童鞋阅读。javascript
背景介绍:在企业级 WEB 应用开发中,为了更好的用户体验&提高响应速度,每每会将一些耗时费力的请求 (Excel导入or导出,复杂计算, etc.) 进行***异步化***处理。 由此带来的一个重要的问题是***如何通知用户任务状态***,常见的方法大体分为2类4种:html
HTTP Polling
client pullHTTP Long-Polling
client pullServer-Sent Events (SSE)
server pushWebSocket
server push
是一种很是简单的实现方式。就是client经过***定时任务***不断得重复请求服务器,从而获取新消息,而server按时间顺序提供自上次请求之后发生的单个或多个消息。前端
短轮询的优势很是明显,就是实现简单。当两个方向上的数据都很是少,而且请求间隔不是很是密集时,这种方法就会很是有效。例如,新闻评论信息能够每半分钟更新一次,这对用户来讲是能够的。vue
它得缺点也是很是明显,一旦咱们对数据实时性要求很是高时,为了保证消息的及时送达,请求间隔必须缩短,在这种状况下,会加重服务器资源的浪费,下降服务的可用性。另外一个缺点就是在消息的数量较少时,将会有大量的 request
作无用功,进而也致使服务器资源的浪费。java
长轮询的官方定义是:git
The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.github
若是与Polling
的方式相比,会发现Long-Polling
的优势是经过hold open HTTP request 从而减小了无用的请求。web
大体步骤为:spring
请求超时
。获取到新消息
或请求超时
,进行消息处理并发起下一次请求。Long-Polling
的缺点之一也是服务器资源的浪费,由于它和Polling
的同样都属于***被动获取***,都须要不断的向服务器请求。在并发很高的状况下,对服务器性能是个严峻的考验。npm
Note:由于以上2两种方式的实现都比较简单,因此咱们这里就不作代码演示了。接下来咱们重点介绍一下
Server-Sent Events
及WebSocket
。
下面咱们将经过一个***下载文件***的案例进行演示SSE
和WebSocket
的消息推送,在这以前,咱们先简单说一下咱们项目的结构,整个项目基于SpringBoot 构建。
首先咱们定义一个供前端访问的APIDownloadController
@RestController
public class DownloadController {
private static final Logger log = getLogger(DownloadController.class);
@Autowired
private MockDownloadComponent downloadComponent;
@GetMapping("/api/download/{type}")
public String download(@PathVariable String type, HttpServletRequest request) { // (A)
HttpSession session = request.getSession();
String sessionid = session.getId();
log.info("sessionid=[{}]", sessionid);
downloadComponent.mockDownload(type, sessionid); // (B)
return "success"; // (C)
}
}
复制代码
type
参数用于区分使用哪一种推送方式,这里为sse
,ws
,stomp
这三种类型。MockDownloadComponent
用于异步模拟下载文件的过程。success
,用于代表下载开始。在DownloadController
中咱们调用MockDownloadComponent
的mockDownload()
的方法进行模拟真正的下载逻辑。
@Component
public class MockDownloadComponent {
private static final Logger log = LoggerFactory.getLogger(DownloadController.class);
@Async // (A)
public void mockDownload(String type, String sessionid) {
for (int i = 0; i < 100; i++) {
try {
TimeUnit.MILLISECONDS.sleep(100); // (B)
int percent = i + 1;
String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
log.info("username={}'s file has been finished [{}]% ", sessionid, percent);
switch (type) { // (D)
case "sse":
SseNotificationController.usesSsePush(sessionid, content);
break;
case "ws":
WebSocketNotificationHandler.usesWSPush(sessionid, content);
break;
case "stomp":
this.usesStompPush(sessionid, content);
break;
default:
throw new UnsupportedOperationException("");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
@Async
让使其异步化
。{"username":"abc","percent":1}
。type
选择消息推送方式。SSE
是W3C定义的一组API规范,这使服务器可以经过HTTP将数据推送到Web页面,它具备以下特色:
key-value
格式&UTF-8
编码的文本数据流,咱们能够在消息payload
中可使用JSON
或者XML
或自定义数据格式。浏览器支持状况:
Note:IE 浏览器可经过第三方JS库进行支持SSE
从Spring 4.2开始支持SSE规范,咱们只须要在Controller
中返回SseEmitter
对象便可。
Note:Spring 5 中提供了Spring Webflux 能够更加方便的使用SSE,可是为更贴近咱们的实际项目,因此文本仅演示使用Spring MVC SSE。
咱们在服务器端定义一个SseNotificationController
用于和客户端处理和保存SSE
链接. 其endpoint
为/api/sse-notification
。
@RestController
public class SseNotificationController {
public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)
@GetMapping("/api/sse-notification")
public SseEmitter files(HttpServletRequest request) {
long millis = TimeUnit.SECONDS.toMillis(60);
SseEmitter sseEmitter = new SseEmitter(millis); // (B)
HttpSession session = request.getSession();
String sessionid = session.getId();
SSE_HOLDER.put(sessionid, sseEmitter);
return sseEmitter;
}
/** * 经过sessionId获取对应的客户端进行推送消息 */
public static void usesSsePush(String sessionid, String content) { // (C)
SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
if (Objects.nonNull(emitter)) {
try {
emitter.send(content);
} catch (IOException | IllegalStateException e) {
log.warn("sse send error", e);
SseNotificationController.SSE_HOLDER.remove(sessionid);
}
}
}
}
复制代码
SSE_HOLDER
保存了全部客户端的SseEmitter
,用于后续通知对应客户端。SseEmitter
对象, 它是SpringMVC提供用于操做SSE的类。usesSsePush()
提供根据sessionId向对应客户端发送消息。发送只须要调用SseEmitter
的send()
方法便可。至此服务端已经完成,咱们使用Vue编写客户端Download.html
进行测试。核心代码以下:
usesSSENotification: function () {
var tt = this;
var url = "/api/sse-notification";
var sseClient = new EventSource(url); // (A)
sseClient.onopen = function () {...}; // (B)
sseClient.onmessage = function (msg) { // (C)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.sseMsg += 'SSE 通知您:已下载完成' + percent + "%\r\n";
if (percent === 100) {
sseClient.close(); // (D)
}
};
sseClient.onerror = function () {
console.log("EventSource failed.");
};
}
复制代码
/api/sse-notification
。效果演示:
WebSocket
相似于标准的TCP链接,它是IETF(RFC 6455)定义的经过TCP进行实时全双工通讯一种通讯方式,这意味这它的功能更强大,经常使用于如股票报价器,聊天应用。
相比于SSE,它不只能够双向通讯,并且甚至还能处理音频/视频等二进制内容。
Note:使用
WebSocket
,在高并发状况下,服务器将拥有许多长链接。这对网络代理层组件及WebSocket
服务器都是一个不小的性能挑战,咱们须要考虑其负载均衡方案。同时链接安全等问题也不容忽视。
Spring 4提供了一个新的Spring-WebSocket
模块,用于适应各类WebSocket引擎,它与Java WebSocket API标准(JSR-356)兼容,而且提供了额外的加强功能。
Note: 对于应用程序来讲,直接使用WebSocket API会大大增长开发难度,因此Spring为咱们提供了 STOMP over WebSocket 更高级别的API使用WebSocket。在本文中将会分别演示经过low level API及higher level API进行演示。
若是想在SpringBoot中使用WebSocket,首先须要引入spring-boot-starter-websocket
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
复制代码
而后就能够配置相关信息,咱们先经过low level API进行演示。
首先须要自定义一个WebSocketNotificationHandler
用于处理WebSocket 的链接及消息处理。咱们只须要实现WebSocketHandler
或子类TextWebSocketHandler
BinaryWebSocketHandler
。
public class WebSocketNotificationHandler extends TextWebSocketHandler {
private static final Logger log = getLogger(WebSocketNotificationHandler.class);
public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>(); // (A)
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { // (B)
String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
WS_HOLDER.put(httpSessionId, session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("handleTextMessage={}", message.getPayload());
}
public static void usesWSPush(String sessionid, String content) { // (C)
WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
if (Objects.nonNull(wssession)) {
TextMessage textMessage = new TextMessage(content);
try {
wssession.sendMessage(textMessage);
} catch (IOException | IllegalStateException e) {
WebSocketNotificationHandler.SESSIONS.remove(sessionid);
}
}
}
}
复制代码
WS_HOLDER
用于保存客户端的WebSocket Session
afterConnectionEstablished()
方法,当链接创建以后,按sessionId
将WebSocket Session
保存至WS_HOLDER
,用于后续向client推送消息。sessionId
获取对应WebSocket Session
,并调用WebSocket Session
的sendMessage(textMessage)
方法向client发送消息。使用@EnableWebSocket
开启WebSocket,并实现WebSocketConfigurer
进行配置。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler();
registry.addHandler(notificationHandler, "/ws-notification") // (A)
.addInterceptors(new HttpSessionHandshakeInterceptor()) // (B)
.withSockJS(); // (C)
}
}
复制代码
WebSocketNotificationHandler
注册至WebSocketHandlerRegistry
.HttpSessionHandshakeInterceptor
是一个内置的拦截器,用于传递HTTP会话属性到WebSocket会话。固然你也能够经过HandshakeInterceptor
接口实现本身的拦截器。server端至此就基本大功告成,接下来咱们来完善一下client端Download.html
,其核心方法以下:
usesWSNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-notification";
var sock = new SockJS(url); // (A)
sock.onopen = function () {
console.log('open');
sock.send('test');
};
sock.onmessage = function (msg) { // (B)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.wsMsg += 'WS 通知您:已下载完成' + percent + "%\r\n";
if (percent === 100) {
sock.close();
}
};
sock.onclose = function () {
console.log('ws close');
};
}
复制代码
callback
,咱们能够在该方法中处理咱们的消息。效果演示:
WebSocket虽然定义了两种类型的消息,文本和二进制,可是针对消息的内容没有定义,为了更方便的处理消息,咱们但愿Client和Server都须要就某种协议达成一致,以帮助处理消息。那么,有没有已经造好的轮子呢?答案确定是有的。这就是STOMP。
STOMP是一种简单的面向文本的消息传递协议,它实际上是消息队列的一种协议, 和AMQP,JMS是平级的。 只不过因为它的简单性恰巧能够用于定义WS的消息体格式。虽然STOMP是面向文本的协议,但消息的内容也能够是二进制数据。同时STOMP 可已使用任何可靠的双向流网络协议,如TCP和WebSocket,目前不少服务端消息队列都已经支持了STOMP, 好比RabbitMQ, ActiveMQ等。
它结构是一种基于帧的协议,一帧由一个命令,一组可选的Header和一个可选的Body组成。
COMMAND
header1:value1
header2:value2
Body^@
复制代码
客户端可使用SEND
或SUBSCRIBE
命令发送或订阅消息。 经过destination
标记述消息应由谁来接收处理,造成了相似于MQ的发布订阅机制。
STOMP的优点也很是明显,即:
最重要的是,Spring STOMP
为咱们提供了可以像Spring MVC同样的编程模型,减小了咱们的学习成本。
下面将咱们的DEMO稍做调整,使用Spring STOMP
来实现消息推送,在本例中咱们使用SimpleBroker
模式,咱们的应用将会内置一个STOMP Broker
,将全部信息保存至内存中。
具体代码以下:
@Configuration
@EnableWebSocketMessageBroker // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp-notification")
.addInterceptors(httpSessionHandshakeInterceptor()) // (B)
.setHandshakeHandler(httpSessionHandshakeHandler()) // (C)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app") // (D)
.enableSimpleBroker("/topic", "/queue"); // (E)
}
@Bean
public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
return new HttpSessionHandshakeInterceptor();
}
@Bean
public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
return new HttpSessionHandshakeHandler();
}
}
复制代码
@EnableWebSocketMessageBroker
注解开启支持STOMPHttpSessionHandshakeHandler
,其主要做用是按sessionId标记识别链接。destination
已/app
开头时,将会把该消息路由到server端的对应的消息处理方法中。***(在本例中无实际意义)***HttpSessionHandshakeHandler
代码以下:
public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
return new HttpSessionPrincipal(sessionId);
}
}
复制代码
当咱们须要向client发送消息时,只须要注入SimpMessagingTemplate
对象便可,是否是感受很是熟悉?! 没错,这种Template
模式和咱们平常使用的RestTemplate
JDBCTemplate
是同样的。 咱们只须要调用SimpMessagingTemplate
的convertAndSendToUser()
方法便可向对应用户发送消息了。
private void usesStompPush(String sessionid, String content) {
String destination = "/queue/download-notification";
messagingTemplate.convertAndSendToUser(sessionid, destination, content);
}
复制代码
在浏览器端,client可使用stomp.js和sockjs-client进行以下链接:
usesStompNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-stomp-notification";
// 公共topic
// var notificationTopic = "/topic/download-notification";
// 点对点广播
var notificationTopic = "/user/queue/download-notification"; // (A)
var socket = new SockJS(url);
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log("STOMP connection successful");
stompClient.subscribe(notificationTopic, function (msg) { // (B)
var jsonStr = msg.body;
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.stompMsg += 'STOMP 通知您:已下载完成' + percent + "%\r\n";
if (percent === 100) {
stompClient.disconnect()
}
});
}, function (error) {
console.log("STOMP protocol error " + error)
})
}
复制代码
/user/
为前缀,Spring STOMP会把以/user/
为前缀的消息交给UserDestinationMessageHandler
进行处理并发给特定的用户,固然这个/user/
是能够经过WebSocketBrokerConfig
进行个性化配置的,为了简单起见,咱们这里就使用默认配置,因此咱们的topic url就是/user/queue/download-notification
。stompClient
消息处理callback进行消息处理。效果演示:
在文中为你们简单讲解了几种经常使用的消息推送方案,并经过一个下载案例重点演示了SSE
及WebSocket
这两种server push模式的消息推送。固然还有不少细节并无在文中说明,建议你们下载源码对照参考。
相比较这几种模式,小编认为若是咱们的需求仅仅是向客户端推送消息,那么使用SSE
的性价比更高一些,Long-Polling
次之。使用WebSocket
有一种杀鸡用牛刀的感受,而且给咱们系统也带来了更多的复杂性,得不偿失,因此不太推荐。而Polling
虽然实现方式最简单且兼容性最强,可是其效率太低,因此不建议使用。固然若是您有其余看法,欢迎留言讨论交流。
文中示例源码:github.com/leven-space…
若是您以为这篇文章有用,请留下您的小💗💗,我是一枚Java小学生,欢迎你们吐槽留言。