spring websocket中 STOMP

26.4 基于WebSocket消息架构STOMP

WebSocket协议定义了两种消息类型,文本或字节,可是没定义它们的内容.它有意让客户端和服务端经过通用的子协议(例如,更高水平的协议)来定义消息语法.可是WebSocket协议的使用是可选的,客户端和服务端须要赞成某些种类的协议来翻译这些消息.html

26.4.1 STOMP概述

STOMP是一个简单的面向文本的消息协议,原来是为Ruby,Python,Perl等脚本语言建立的,用来链接企业消息代理器.设计它的目的是解决一部分通用的消息格式问题.STOMP能够任何可靠的双向流协议中使用,好比TCP或WebSocket.尽管STOMP是一个面向文本的协议,可是消息的内容能够是文本或字节.html5

STOMP 是一个基于Http框架的协议.STOMP框架的结构以下:java

COMMAND
header1:value1
header2:value2

Body^@

客户端可使用"SEND"或"SUBSCRIBE"命名来发送或订阅消息,须要一个目的地,用来描述这个消息是什么和谁来接收它.它会激活一个简单的发布订阅机制,能够用来经过代理向其余链接的客户端来发送消息或者向服务端发送消息来要求执行一些工做.react

当使用spring的STOMP支持时,spring webSocket应用扮演了客户端的STOMP代理器的角色.消息被路由到@Controller里的消息处理方法,或一个简单的,内置的用来跟踪订阅或广播消息的代理器来转发给订阅者.你也能够配置一个专用的STOMP代理器(如RabbitMQ,ActiveMQ等)进行消息广播.在这种状况下,spring会保持代理的TCP链接,传递消息给它,也经过它把消息传递给链接的webSocket客户端.因此spring web应用能够依赖统一的基于http的安全,通用验证;相同的编程模式在消息处理上也起做用.git

这里有一个例子,客户端订阅接收服务端按期发布的股票报价,例如定时经过SimpMessagingTemplate将消息发送到代理器;github

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

下面例子:一个客户端发送交易请求,服务端经过@NessageMapping的方法处理,处理完以后,将交易信息和细节广播给客户端.web

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

目的地意味着在STOMP规则中有意遗留印记.它能够是任何符合STOMP服务器支持的目的地的语法和语义的字符串.可是,一般来讲,路径字符串中"/topic/.."是发布订阅(一对多),"/queue/"意味着点到点的信息交换(一对一).spring

STOMP服务器能够用Message命令向全部订阅者发布消息.这里是一个服务器向一个订阅客户端发送股票交易的例子:数据库

MESSAGE
message-id:nxahklf6-1
subscription:sub-1

destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

你须要知道服务器不会发送未被订阅的消息.服务器端全部的消息都对应一个定义的客户端订阅,而且服务器消息的"subscription-id"头必须与客户端订阅的Id头一致的.编程

上面的简介是为了提供对STOMP协议最简单的理解.你能够去阅读该协议的说明书了解更多.

使用STOMP做为WebSocket子协议的好处:

  • 无需建立一个自定义消息格式
  • 能够在浏览器中使用已存在的stomp.js
  • 能够根据端点来回路消息
  • 可使用成熟的消息代理器,如RabbitMQ,ActiveMQ等进行广播

最重要的一点是使用STOMP能够像spring-mvc提供的基于HTTP的编程模式来启用spring提供的应用级别的编程模式.

26.4.2 在WebSocket上启用STOMP

spring框架经过spring-message,spring-websocket模块提供了对基于WebSocket的STOMP协议的支持.下面有个经过在"/portfolio"暴露STOMP webSocket/SockJS端点例子,这里终点以"app"开头的消息会回路到消息处理方法(如应用业务);另外的以"/topic","/queue"开头的消息会回路到消息代理器(如,广播到其余在线客户端)

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }

}

在xml中的配置

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

这个"/app"前缀是任选的,它只是用来分区那些应该被回路到消息处理方法中由应用处理的消息,和那么由代理器广播到订阅客户端的消息.

而"/topic","/queue"前缀依赖于使用中的代理器.在简单的,内置代理下这个前缀没有特定的含义,它没法区分这个终点如何使用.(发布-订阅目标有不少订阅者或通常只针对单个接受者的点对点消息).若是使用备用代理器,大多数代理器使用"/topic"做为发布/订阅语义终点的前缀,并把"/queue"做为点作点语义终点的前缀.

在浏览器里,客户端能够经过使用Stomp.js和sockjs-client链接

var socket=new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient =Stomp.over(socket);
stompClient.connect({},function(frame){
});

又或者经过WebSocket(不经过SockJS)链接

var socket=new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient=Stomp.over(socket);

stompClient.connect({},function(frame){
});

记住stompClient不须要指定login或passcode头.即便加了,客户端也会忽略或者会覆盖掉它.能够经过查看 26.4.8节,成熟代理器的连接,和26.4.10 安全,来获取更多安全信息

26.4.3 Flow of Messages

当STOMP端点被配置,spring应用会扮演连接客户端的STOMP代理器的角色.本节提供了一个大图来展现消息是如何在应用内部流动的.

spring-messaging模块听过异步消息处理的基础.它包含了大量源于spring Integration项目里的抽象,并把它们做为消息应用里的基础模块.

  • Message 一个有头和内容的消息

  • MessageHandler 消息处理的协议

  • MessageChannel 消息发送的协议,能够减小发送者和接受者之间的耦合.

  • SubScribableChannel (订阅频道)继承了消息协议,将消息发送到已注册的消息处理的订阅者们.

  • ExecutorSubscribableChannel 订阅协议的具体实现,能够经过线程池实现异步分发消息.

@EnableWebSocketMessageBroker java配置和websocket:message-broker的xml配置共同构成一个复杂消息流.下面的图表展现的是使用简单,内存代理器的状况.

输入图片说明

上面的图片包含了三个消息频道:

  • ClientInboundChannel用于接收从webSocket客户端发出的消息.
  • clientOutboundChanel 用于向WebSocket客户端发送消息
  • brokerChannel 应用内部消息的代理器

这三个频道一样在专用代理器里使用,除了"代理器替身"替代了简单代理器.

输入图片说明

"clientInboundChannel"里的消息流向注解方法给应用处理(例如股票交易执行请求)或直接发送给代理器(例如客户端订阅股票信息报价).这个STOMP目的地用于简单的前缀回路.例如,"/app"前缀的消息回路到注解方法,而"/topic","/queue"前缀的消息用于回路到代理器.

当一个消息处理注解的方法有返回值时,它的返回值会做为spring消息的内容发送到"brokerChannel".这个代理器会把这个消息广播到客户端.只要借助消息模板,你能够在应用的任何地方将消息发送到任何一个目的地.例如,一个http的POST处理方法能够广播一个消息到全部的关联客户端,或者一个服务组件能够定时广播股票价格.

下面是一个简单说明消息的例子:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}

下面是关于上例中消息流的解释:

  • WebSocket 客户端连接WebSocket端点"/portfolio"

  • 关于"/topic/greeting"订阅经过"clientInboundChannel"传播到代理器

  • 发送到"/app/greeting"的拥抱经过"clientInboundChannel"进入并跳转到GreetingController.控制器添加了当前时间,它的返回值经过"brokerChannel"做为消息发送到"/topic/greeting"端口.(终点通常根据协议选择,可是能够经过@SendTo来重写)

  • 代理器随后把消息广播给订阅者,他们经过"clientOutboundChannel"实现传输.

下节提供注解方法更多的细节,包括参数的种类和返回值的支持.

26.4.4 注解消息处理

@MessageMapping注解能够在@Controller类的方法上使用.它既能够来用表示消息目的地的映射方法,也能够组合成类级别的@MessageMapping,用来表示控制器里的全部映射方法的共用映射.

默认的目的地映射被认为是Ant风格,斜线分割,路径格式.例如"/foo*","/foo/**".这里还包括模板变量,如"/foo/{id}",它能够被@DestinationVariable注解来引用.

也可使用逗号做为分隔符.

@MessageMapping标志的方法支持一下方法参数:

  • Message 方法参数,用来获取要处理的完整消息

  • @Payload 用来获取消息内容的注解参数,由org.springframework.messaging.converter.MessageConverter转化.这个注解不是必须的,由于默认它是存在的.带有验证注解的负载方法的参数注解受限于JSR-303验证.

  • @Header 必要时可使用org.springframework.messaging.converter.Converter类来访问特定的头的值

  • @Headers 该注解的方法参数必须指定为java.util.Map用来访问消息里全部的头

  • MessageHeaders 用来获取消息里全部头的Map的方法参数

  • MessageHeaderAccessor,SimpMessageHeaderAccessor,StompHeaderAccessor都是通过类型访问方法来访问头信息.

  • @DestinationVariable 用来访问消息目的地中模板参数的注解参数.他的值能够更具须要转为成申明的方法参数类型.

  • java.security.Principal 在webSocket进行http握手时,用来反射用户日志的方法参数

@MessageMapping 方法能够被org.springframework.messaging.converter.MessageConverter转化器转为有一个新的消息主体,默认,它们会看成客户端消息用相同的目的地,但使用"/topic"前缀来发送到"brokerChannel"频道.用@SendTo能够指定其余目的地.他还能够设置一个类级别的共用目的地.

返回的消息能够异步提供,经过ListenableFuture或CompletableFutrue/ComplentionStage等返回类型签名,相似springmvc类似的方法.

@SubscribeMapping注解能够将订阅请求映射到@Controller方法里.它支持方法级别,可是能够被组合为类别的@MessageMapping注解,可让相同控制器里处理方法共享..

默认@SubscribeMapping方法的返回值会直接返回到链接的客户端,不须要经过代理器中转.这对请求回复的消息交互很是有用,例如,应用UI初始化时,能够方便的获取应用数据.另外@SubscribeMapping方法也能够被@SendTo方法注解,结果消息被被发送到"brokerChannel"用特定的目标目的地.

有时一个控制器在运行时须要用AOP代理装饰.例如,你在本身的控制器上选择添加@Transactional注解.在这种状况下,对于这些控制器,咱们建议使用类级别的代理.这是控制器的典型默认选择.可是若是一个控制器实现了一个spring上下文没有回调的接口,你须要明确的配置基于类的代理.例如,tx:annotation-driven/,须要转化为<tx:annotation-driven proxy-target-class="true"/>

26.4.5 Sending messages 发送消息

你是否想从应用的任何地方都想已链接的客户端发送消息.每一个应用组件均可以把消息发送到"brokerChannel"中.最简单的实现方式是进行SimpMessagingTemplate注入,并用他发送消息.通常它很容易按类型注入,例如:

@Controller
public class GreetingController{
@Autowired
private SimpMessagingTemplate template;

@RequestMapping(path="/greetings",method=POST)
public void greet(String greeting){
   String text="["+getTimestamp()+"]"+greeting;
   template.convertAndSend("/tpic/greetings",text)
}

}

可是这个等同"brokerMessagingTemplate".

26.4.6 Simple Broker 简单代理器

这个简单的内置的代理器处理客户端的订阅请求,把它们储存到内存,并根据目的地匹配来广播消息到在线客户端.这个代理器支持路径风格目的地,包括ANT风格的目的地匹配.

ant风格路径:http://blog.csdn.net/wangshfa/article/details/26471641

26.4.7 成熟代理器

简单代理器容易上手,但只支持部分STOMP命令,它基于一个简单的消息发送池,且不支持集群.相应的,应用能够经过使用成熟的消息代理器来升级.

检查STOMP的文档,选择适合的消息代理器,安装他们,并在STOMP支持下运行他们.接着用spring配置中的STOMP代理器替身来替代简单代理器. 下面的例子是如何配置一个成熟的代理器

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

xml配置以下:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

上述例子中配置的"STOMP broker relay"是一个spring的消息处理器,他将消息转发给外部的消息代理器.他会创建到代理器的TCp连接,转发全部的消息给他,并把全部从代理器收到的消息经过webSocket session转发给客户端.基本上他扮演了一个中介器的做用,向两边发送消息.

spring使用org.projecteactor:reactor-net和io.netty:netty-all来管理TCP链接,须要时能够将他们做为依赖添加.

spring4.3支持的STOMP代理器兼容2.0x的反应器,因此它不支持与spring-cloud-stream-reactive聚合,它须要3.0X的反应器.

spring5依赖于Reactor3和Reactor Netty,他们有独立的版本,支持STOMP的代理器,还对活性编程模式提供了大量支持.

另外,服务组件仍是能够向代理器替身发送信息,以进行广播的

实际上,代理器替身使得消息广播更加健壮和可扩展.

26.4.8 Connections To Full-Featured Broker链接到成熟代理器

STOMP代理器中介器保持这地代理器的一个系统级别的TCP连接.这个链接用来接收服务端产生的消息,而不是接收其余消息.你能够设置该连接的凭证,如STOMP框架的登录和密码头.这个能够在XML命令空间或java配置里经过设置SystemLogin/systemPasscode属性来设置,默认值是guest/guest.

STOMP代理器中介器也为每一个链接的webSocket客户端建立了一个单独的TCP链接.你能够为全部的客户端链接配置STOMP凭证.这个能够在XML命令空间或java配置里经过设置SystemLogin/systemPasscode属性来设置,默认值是guest/guest.

STOMP代理器中介器通常表明客户端给每一个跳转到的代理器的链接框架设置login和passcode头.因此WebSocket客户端不须要设置这么头,他们会被忽略.下面的部分,webSocket客户端能够依赖HTTP安全来保护WebSocket端点和建立客户端身份.

STOMP代理器中继会经过"系统"的TCP链接向消息代理器发送和接受心跳消息.你能够设置发送和接受心跳的频率(默认10秒每次).若是指向代理器的连接消失了,代理器中介会每5分钟一次,持续尝试重连,直到成功.

spring的bean能够实现为ApplicationListener<BrokerAvailabilityEvent>的接口,用来接收指向代理器的系统链接中断或重连的通知.例如,当这里没有可用的系统链接时,一个股票价格服务能够中止尝试发送消息.

STOMP代理器中介能够经过virtualHost来配置.这个属性的值能够被设置到每一个链接框架的host头里,这会颇有用,例如在一个云环境里,每个TCP链接的实际地址会根据云基础STOMP服务提供host的不一样而差别.

26.4.9 在@MessageMapping 目的地里使用句号分隔符

尽管斜线分割的路径格式被web开发者熟知,但在消息开发里,"."是主流,例如,在主题的名字,队列,交换者等.应用也能够经过配置自定义的AntPathMatcher在@MessageMapping映射中使用句号来代替斜线.

java配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  // ...

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/queue/", "/topic/");
    registry.setApplicationDestinationPrefixes("/app");
    registry.setPathMatcher(new AntPathMatcher("."));
  }

}

在xml配置

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:websocket="http://www.springframework.org/schema/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/websocket
    http://www.springframework.org/schema/websocket/spring-websocket.xsd">

  <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
    <websocket:stomp-endpoint path="/stomp" />
    <websocket:simple-broker prefix="/topic, /queue"/>
  </websocket:message-broker>

  <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
    <constructor-arg index="0" value="." />
  </bean>

</beans>

下面是控制器中使用"."分隔符的简单例子

@Controller
@MessageMapping("foo")
public class FooController {

  @MessageMapping("bar.{baz}")
  public void handleBaz(@DestinationVariable String baz) {
  }

}

若是这个应用的前缀是"/app",那么这个方法能够被映射为"/app/foo.bar.{baz}";

26.4.10 Authentication 认证

每个WebSocket的消息的STOMP session开始于一个http请求,而后它能够被升级为WebSockets,或者被回退为一系列的SockJS http传输请求.

web应用早已用认证和权限来保护HTTp请求.通常一个用户会通过spring安全的一些机制,如登录也,http基础认证或其余来认真.已认证用户的认证的上下文被保存到Http session里,并与基于相同cookie的session相关联.

所以对于一个WebSocket握手,或者SockJS的http传输请求,通常它们已经经过HttpServletRequest#getUserPrincipal()认证为一个认证用户.spring 会自动给这些用户建立WebSocket或SockJS session,随后的STOMP消息传输经过一个用户头来进行传输.

上面对于一个典型的web应用来言没有什么特别的,它们已经为安全这么作了.用户经过http请求进行认证,并经过基于cookie的http session持有这个上下文.这样SockJS或WebSocket就会为用户建立Session,并添加一个用户头的戳,这样在应用中它们就可与进行消息传输了.

记住 STOMP协议在链接框架中有一个"login"或"passcode"头.这些开始就是为,如今也是为基于TCP的STOMP设计的.可是,spring通常会忽略STOMP协议头的认证信息,它认为这个用户已经被http传输请求认证过了,并指望WebSocket或SockJs session包含认证用户.

spring 安全提供了webSocket 子协议认证:经过一个 ChannelInterceptor基于消息中的用户头来认证消息.另外,spring session提供了WebSocket integration确保使用webSocket session时,http session不会过时.

26.4.11 基于令牌的认证

Spring Security OAuth 支持基于令牌的安全包括JSON Web Token.这个能够做为web应用的安全机制,包括基于WebSocket的STOMP交互,如上下文所述同样,经过一个基于cookie的session来保持认证.

同时基于Cookie的session并非最好的选择,例如在应用里,他们不但愿持有来自服务器端的session,在手机应用中,它们更倾向于用用户头进行安全认真.

在 webSocket protocol RFC 6455中说起,在webSocket握手期间,服务器对客户端的身份验证,不须要指定特别的方式.实际上,虽然浏览器客户端能够是用标准的认证头或cookie,但没法使用自定义头.例如SockJS js客户端没法提供一个方式来发送Http头,查看 sockjs-client issue 196. 可是它容许发送查询参数,这能够用来发送token.这个也有缺点,如,这个令牌带着URL可能无心中被服务器日志记录下来.

以上的缺点只针对基于浏览器的客户端,而不是基于java的客户端.它支持在webSocket或SockJS请求中发送头消息.

不倾向于使用cookie,但在http协议水平上没有更好的方法.倾向于使用安全头,这里有两个步骤:

    1. 在链接期间使用STOMP 客户端发送认证头
    1. 使用ChannelInterceptor处理认证头

下面的例子中服务侧配置会注册一个自定义认证拦截器.记住这个拦截器须要认证并在链接信息中设置用户头.spring会概率并存这些认证用户,在随后的相同session的STOMP消息会用到他们.

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new ChannelInterceptorAdapter() {

        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {

            StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                Principal user = ... ; // access authentication header(s)
                accessor.setUser(user);
            }

            return message;
        }
    });
  }
}

还要记住在消息中使用spring Security的安全机制,首先你要保证你的认证的ChannelInterceptor配置优先于spring安全的.最好的作法就是在AbstatctWebSocketMessageBrokerConfigurer的子类中宣布自定义拦截的标记为@Order(Ordered.HIGHEST_PRECEDENCE + 99)

26.4.12 用户目的地

一个应用能够发送消息给特定的用户,spring的STOMP经过在目的地添加前缀"/user/"来支持它.例如,一个客户端可能订阅目的地"/user/queue/position-updates".这个目的地能够被UserDestinationMessageHandler处理,被会加上惟一的用户session,例如"/queue/position-updates-user123".它提供了通用的订阅名称目的地的便利,同时保证与其余订阅相同的目的地的用户没有冲突,因此每一个用户都会收到惟一的股票位置更新.

在发送端消息能够被发送到目的地例如:"/user/{username}/queue/position-updates",这个目的地会被UserDestinationMessageHandler转译为一个或多个目的地,对应用户的每一个session.这容许应用中的全部组件发送消息给一个特定的用户,只要知道这个用户名字和原始的目的地.它还支持像消息模板同样的注解.

例如,一个消息处理方法能够在@SendToUser注解的协助处理下将消息发送给用户.(它还支持类级别的共享公共目的地)

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

若是用户有多个session,那么默认的全部session订阅对于特定的目的地都是可标志的.可是有时候,它必须只能指向那个发送了要处理消息的session.这个能够经过设置broadCast属性为false来实现.

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

虽然用户的目的地通常意味着是认证用户,但这并不严格.一个与认证用户无关的session也能够订阅用户目的地.在这种状况下,@SendToUser会默认为broadcast=false;即只面向那么发送了消息的session.

还能够经过注入SimpMessagingTemplate的bean发送用户目的地消息.通常经过java配置或XML命名空间注入.若是bean的名字是"brokeMessagingTemplate",则须要匹配@Qualifier.

@Service
public class TradeServiceImpl implements TradeService {

	private final SimpMessagingTemplate messagingTemplate;

	@Autowired
	public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
		this.messagingTemplate = messagingTemplate;
	}

	// ...

	public void afterTradeExecuted(Trade trade) {
		this.messagingTemplate.convertAndSendToUser(
				trade.getUserName(), "/queue/position-updates", trade.getResult());
	}
}

在额外消息代理器下使用用户目的地时,检查代理器的文档了解如何管理未被激活的队列.当用户的session结束,全部惟一的用户队列会被移除.例如,当像/exchange/amq.direct/position-updates目的地被使用时,RabbitMQ建立自动删除的队列.这种状况下,客户端能够订阅/user/exchange/amq.direct/position-updates.相似的,ActiveMQ也有相同的配置选项来清除未启动的目的地.

在混合应用服务器场景下,一个用户目的地能够保持未释放由于用户在关联其余服务器.这种状况下,你须要配置一个能够广播未释放的消息,这样其余服务器有机会去尝试.这个能够经过java配置里的MessageBrokerRegistry类里的userDestinationBroadcast属性配置,或者经过message-broker元素的user-destination-broadcast属性配置.

26.4.13 监听应用上下文事件和拦截消息

一些ApplicationContext事件能够被发布,能够被实现了spring的ApplicationListener接口的类接受.

  • BrokerAvailabilityEvent 说明代理器变成可用/不可用状态.当应用运行时,简单的代理器当即启用,变得可行,那么STOMP 代理器中介会断掉它和成熟代理器的连接,例如当这个成熟代理器重启时.代理器中介有重连机制,当代理器重启后会从新创建连接,因此在连到断链,以及断链到连上他们都会发布事件.使用SimpMessagingTemplate组件应该订阅这个事件,在代理器不可用时避免发送消息.在任何状况下,当发送消息是,他们须要准备处理MessageDeliverException

  • SessionConnectEvent 当一个新的STOMP链接被接收时发布,代表一个新的客户端session.这个事件包含了表明链接的session id,用户信息,全部客户端发送的自定义消息头.这对跟踪客户端session很是有用.订阅这些事件的组件可使用SimpMessageHeaderAccessor或StompmessageHeaderAccessor包裹包含的消息.

  • SessionConnectedEvent 在SessionConnectEvent事件以后,当代理器向已经发布了一个STOMP Connection框架回应CONNECT以后,就会当即发布.此时,会认为STOMP的session已彻底创建.

  • SessionSubscribeEvent 当收到一个新的STOMP订阅时发布.

  • SessionUnsubscribeEvent 当收到一个新的STOMP退订时发布.

  • SessionDisconnectEvent 当STOMP的session关闭时发布.这个关闭多是从客户端发送的,也多是WebSocket关闭自动产生的.在某些状况下,每一个session的这个事件会发生屡次.组件须要在处理混合关闭事件上作到幂等操做.

当你使用一个成熟的代理器,当代理器临时变得不可靠时,这个STOMP代理器中介会自动重连系统链接.但客户端链接不会重连.若是心跳可用,客户端通常会在10秒内发现代理器没法回应.客户端须要实现本身的重连逻辑.

另外,应用能够经过注册ChannelInterceptor在相应的消息频道上拦截进出消息.例如拦截进入的消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new MyChannelInterceptor());
  }
}

一个自定义的ChannelInterceptor能够实现基类的ChannelInterceptorAdapter,并使用StompHeaderAccessor或SimpMessageHeaderAccessor来访问消息里的信息.

public class MyChannelInterceptor extends ChannelInterceptorAdapter {

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
    StompCommand command = accessor.getStompCommand();
    // ...
    return message;
  }
}

26.4.14 STOMP客户端

spring提供了一个基于WebSocket的STOMP客户端和基于TCP客户端的STOMP.

开始建立和配置WebSocketStompClient

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的例子StandardWebSocketClient能够被SockJsClient替代,由于它也是WebSocketClient的一个实现.这个SockJsClient可使用WebSocket或http-based传输做为回退.更多的细节查看26.3.7.

下一步是创建一个链接并提供了一个STOMP session的处理器.

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当这个session可使用时,这个handler会被通知.

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

当session创建以后,它的内容就会发送,并经过已配置的MessageConverter序列化.

session.send("/topic/foo", "payload");

你能够订阅目的地.这个订阅方法要求一个关于订阅信息的处理器并返回能够用来退订的订阅处理.对于每个收到的信息这个处理器能够指定每一个能够被序列化的负载内容的目标对象类型.

session.subscribe("/topic/foo", new StompFrameHandler() {

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }

});

要启用STOMP心跳,须要用TaskScheduler配置webSocketStompClient,能够自定义心跳频率,10秒钟写入不活跃的,它将引起一次心跳发送;10秒读取补货要的,它会关闭链接.

若是在同一机器上进行数千个客户端模拟,则须要考虑关闭心跳.由于每一个链接都有本身的心跳任务,这里没有为一个机器上的大量客户端运行作优化.

STOMP协议还支持收据,即客户端必须添加收据头以对应服务器处理发送或订阅以后返回的RECEIPT框架.为了支持它,StompSession提供了setAutoReceipt(boolean)属性,它能够在随后的每次发送和订阅都添加"receipt"头.你还能够手动选择添加一个"receipt"头到StompHeaders里.发送和订阅能够返回一个Receiptable实例,这样能够被用来注册收据成功或失败回调.对于该功能,客户端必须配置TaskScheduler,其时间量必需要小于订阅过时时间(默认15秒过时).

记住StompSessionHandler自己也是一个StompFrameHandler.它能够处理ERROR框架相应回调处理异常,这些异常来自处理消息,传输级别的HandleTransportError,包括ConnectionLostException.

26.4.15 WebSocket Scope(做用域)

每个WebSocket的session都是属性的集合.这个map与客户端传入消息相连,还能够被控制器里的方法访问.例如:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

你也能够申明一个spring管理的bean是websocket做用域.Websocket-scope的bean能够被注入到控制器和任何被注册为"clientInboundChannel"的频道拦截器里.这里bean通常是单例的,存活时间大于单个的webSocket session.因此你须要对WebSocket-scope的bean使用做用域代理模式.

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // Invoked after dependencies injected
    }

    // ...

    @PreDestroy
    public void destroy() {
        // Invoked when the WebSocket session ends
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // this.myBean from the current WebSocket session
    }
}

同任何自定义做用域同样,spring初始化一个新的Mybean实例,它能够被控制器获取,并存储到webScoket seesion属性里.每次相同的实例都会返回直到session终结.WebSocket做用域的bean会拥有全部的spring生命周期方法调用,如上例示.

26.4.16 配置和性能

谈到性能时,这里没有银弹.许多因素会影响它,包括消息大小,体积,应用方法工做是否须要阻塞,以及外部因素,包括网络速度和其余.本节主要提供了一些可配置选项的简介,还有关于如何合理缩放的思考.

在一个消息应用里,消息在线程池支持下被频道传输并能够被异步操做.配置这样一个应用须要消息频道和流相关的只是.具体能够参考26.4.3节.

显然,首先配置线程池支持的"clientInboundChannel"和"clientOutboundChannel".默认配置的数目是可用处理器的两倍.

若是注解方法处理消息受限于CPU,那么"clientInboundChannel"里的线程的数量应该和处理器数量相同.若是他们的工做更可能是首先IO,须要阻塞,等待数据库或其余外部系统,那么线程池容量就能够增长.

ThreadPoolExecutor有三个重要属性.这里是核心线程池容量,线程池最大容量,等待线程最大容量.

最打困扰就是配置核心线程池容量(10)或最大线程池容量(20)这样就配置出一个10到20个线程的线程池.即便你把等待capacity加到最大,即Integer.MAX_VALUE,核心线程池里的线程数量也不会增长,由于只是增长了排队任务的数量.

请查看THreadPoolExecutor的文档来学习这些属性,并理解不一样的排队策略.

在"clientOutboundChannel",是关于发送消息到WebSocket客户端.若是客户端的网络比较快,那么线程数量应该与可用处理器数量保持一致.但若是他们太慢或低带宽,接受消息会耗费太长时间,并给线程池添加负担.因此增长线程池容量是有必要的.

"clientInBoundChannel"的工做量很好预测-这个依赖于应用自己,但如何配置"clientOutboundChannel"却很难,由于有太多超出应用自己的因素.所以这里有两个与信息发送相关属性.这是"sendTimeLimit","sendBufferSizeLimit".分别用来配置发送消息到客户端时,发送能够花费多长时间或多大的数据会被缓存.

基本观点,任何单个线程用来发送消息给客户端的时间都是有限制的.同时全部其余消息会被缓冲,你须要使用属性来配置须要消耗多久时间发送一条消息,同时能够缓冲多大的数据.能够经过javadoc或xml文档来配置这些重要的额外细节.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

上面的webSocket传输配置还能够配置接受的STOMP信息的最大数量.尽管理论上webSocket 消息在容量上是无限的,实际上有webSocket服务器限制,例如:tomcat是8K,jetty是64K.基于这个缘由,stomp.js把大的STOMP消息分割为16K一份,而后把它们做为混合webSocket消息发送,这要求服务器缓冲并从新组装.

基于webSocket的spring STOMP支持这样,因此应用能够配置消息的最大容量,而无关webSocket server指定的消息容量.记住webSocket消息的容量能够自动调整,只要能保证他们最多传输16K大小的webSocket消息.

例子以下:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

关于收放的最重要一点是使用混成应用实例.目前这个不适于简单代理器.可是适用如RabbitMQ等成熟的代理器.即每一个应用实例能够链接到代理器,一个应用的消息广播能够经过代理器广播到其余应用实例相连的WebSocket客户端上.

26.4.17 运行监控

当使用@EnableWebSocketMessageBroker或websocket:message-broker时,关键框架组件会自动收集统计和数据,这能够对整个应用的内在状态提供洞察.这个配置也声明一个WebSocketMessageBrokerStats的bean在某处收集各类可用的信息,默认每30分钟记录为INFO级别.这个bean在运行时能够被经过spring的MBeanExporterd导出到JMX,例如经过JDK的Jsonsole.下面是可用信息的总结.

Client WebSocket Session(客户端 webSocket Session)

  • Current

    代表当前有多少客户端session经过代理器的方式 WebSocket VS Http流和SockJS session长轮询.

  • Total

    目前总计创建了多少session

  • Abnormal Closed(不正常关闭)

  • connect Failure (链接失败) 一个session已经创建,但在60秒内没有收到消息会被关闭.一般是网络或网络代理问题.

  • Send Limit Exceeded (发送过限)

    发送消息超过配置的时间或缓存数量致使session关闭,通常多是客户端网速过慢/
  • Transport Errors (传输错误)

    当发生传输错误时,seesion关闭.好比读写WebSocket链接或Http请求/相应失败.
  • STOMP Frames

    CONNECT,CONNECTED,DISCONNECT框架周期的数量代表有多少客户端经过STOMP层次链接.记住链接关闭的数量要小于session非正常关闭的数量,或未经过发送DISCONNECT框架关闭的数量.

STOMP Broker Relay (stomp 代理器中介)

  • Tcp Conenctions (TCP链接)

说明有多少表明webSocket客户端的session的指向代理器的TCp链接.这个应该是客户端session的数量+1,另外一个是应用内部用于发送消息的系统链接.

  • STOMP Frames (STOMP框架)

    从代理器接受或发送的表明客户端的CONENCT,CONNECTED,DISCONNECT框架的总数.记住一个DISCONECT是发送到代理器的,而与客户端WebSocket sessoin是否关闭无关.因此更少DISCONNECT框架的数量代表代理器挣积极关闭链接,多是由于心跳为及时到达,一个无效的输入框架,或其余.

  • Client Inbound Channel(客户端输入频道)

来自支持"clientInboundChannel"端的线程池统计提供了入库消息进程的健康状态的审视.任务队列上升代表应用处理消息太慢了.若是这里有I/O方面的任务(例如下降数据库查询,第三方的REST API的http请求),则须要考虑增长线程池容量.

  • Client Outbound Channel (客户输出方面频道)

    从支持"clientOutboundChannel"的线程池获得的统计提供了想客户端广播消息相关的健康情况.排队数量变大,则说明客户端接受消息太慢.一种解决办法是增长线程池的数量来适应当前缓慢客户端的数量.另外一种方法是下降发送时间间隔和发送缓冲限制.(查看上节内容)

  • SockJS Task Scheduler(SockJS 任务调度器)

    这个是用来发送心跳的SockJS任务调度器的线程池数量的统计.记住小心跳协商为STOMP层次时,SockJS心跳会失效.

26.4.18 测试标志的控制器方法

这里有两种主要步骤经过spring STOMP对WebSocket的支持来测试应用.一个是服务器测试控制器和它注解的消息处理方法的功能.另外一个是写一个全功能的端对端的测试,包括运行着的客户端和服务器.

这两个步骤不是互相排斥的.相反,每个都有总体测试策略的地点.服务端的测试更加集中在写和处理.端对端集成测试一方面更加完整,测试更多,但他们更多的是啊是啊写和处理.

服务器端测试最简单的方案是写一个控制器单元测试.可是由于控制器依赖它的注解,因此这个不是特别好用.单纯的单元测试没法检测他们.

理想的控制器测试应该在运行时唤醒,就像使用spring mvc测试框架来测试控制器处理http请求同样,无需运行在Servlet容器里但能够经过spring框架来唤醒这个标志控制器.项spring mvc测试同样,这里有两种可能的选择.要么使用"context-based"或"standalone"安装:

    1. 经过spring TestContext framework的帮助来加载真实的spring配置.将"clientInboundChannel"做为一个测试域,使用它发送消息会被控制器方法处理.
  • 2.手动安装能唤醒控制器(名为SimpAnnotationMethodMessageHandler)的最小的spring框架组件,并经过控制器直接发送消息.

这两种计划方案都表如今tests for the stock portfolio项目里.

第二种方案是建立端对端的集成测试.对此,你须要在可行状态下运行一个webSocket服务器,并经过WebSocket客户端链接它,并发送包含了STOMP框架的消息.tests for the stock portfolio项目里也介绍了一种方案,使用Tomcat做为可用服务器和一个简单STOMP客户端以达到测试目的.

相关文章
相关标签/搜索