直接使用WebSocket(或SockJS)就很相似于使用TCP套接字来编写Web应用。由于没有高层级的线路协议(wire protocol),所以就须要咱们定义应用之间所发送消息的语义,还须要确保链接的两端都能遵循这些语义。javascript
就像HTTP在TCP套接字之上添加了请求-响应模型层同样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。css
与HTTP请求和响应相似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,以下就是发送数据的一个STOMP帧:java
>>> SEND transaction:tx-0 destination:/app/marco content-length:20 {"message":"Marco!"}
在这个例子中,STOMP命令是send,代表会发送一些内容。紧接着是三个头信息:一个表示消息的的事务机制,一个用来表示消息要发送到哪里的目的地,另一个则包含了负载的大小。而后,紧接着是一个空行,STOMP帧的最后是负载内容。git
STOMP 的消息根据前缀的不一样分为三种。以下,以 /app 开头的消息都会被路由到带有@MessageMapping 或 @SubscribeMapping 注解的方法中;以/topic 或 /queue 开头的消息都会发送到STOMP代理中,根据你所选择的STOMP代理不一样,目的地的可选前缀也会有所限制;以/user开头的消息会将消息重路由到某个用户独有的目的地上。github
@Configuration @EnableWebSocketMessageBroker @PropertySource("classpath:resources.properties") public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer { @Value("${rabbitmq.host}") private String host; @Value("${rabbitmq.port}") private Integer port; @Value("${rabbitmq.userName}") private String userName; @Value("${rabbitmq.password}") private String password; /** * 将 "/stomp" 注册为一个 STOMP 端点。这个路径与以前发送和接收消息的目的地路径有所 * 不一样。这是一个端点,客户端在订阅或发布消息到目的地路径前,要链接到该端点。 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/stomp").withSockJS(); } /** * 若是不重载它的话,将会自动配置一个简单的内存消息代理,用它来处理以"/topic"为前缀的消息 * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //基于内存的STOMP消息代理 registry.enableSimpleBroker("/queue", "/topic"); //基于RabbitMQ 的STOMP消息代理 /* registry.enableStompBrokerRelay("/queue", "/topic") .setRelayHost(host) .setRelayPort(port) .setClientLogin(userName) .setClientPasscode(password);*/ registry.setApplicationDestinationPrefixes("/app", "/foo"); registry.setUserDestinationPrefix("/user"); } }
服务端处理客户端发来的STOMP消息,主要用的是 @MessageMapping 注解。以下:web
@MessageMapping("/marco") @SendTo("/topic/marco") public Shout stompHandle(Shout shout){ LOGGER.info("接收到消息:" + shout.getMessage()); Shout s = new Shout(); s.setMessage("Polo!"); return s; }
2.一、@MessageMapping 指定目的地是“/app/marco”(“/app”前缀是隐含的,由于咱们将其配置为应用的目的地前缀)。spring
2.二、方法接收一个Shout参数,由于Spring的某一个消息转换器会将STOMP消息的负载转换为Shout对象。Spring 4.0提供了几个消息转换器,做为其消息API的一部分:websocket
2.三、尤为注意,这个处理器方法有一个返回值,这个返回值并非返回给客户端的,而是转发给消息代理的,若是客户端想要这个返回值的话,只能从消息代理订阅。@SendTo 注解重写了消息代理的目的地,若是不指定@SendTo,帧所发往的目的地会与触发处理器方法的目的地相同,只不过会添加上“/topic”前缀。app
2.四、若是客户端就是想要服务端直接返回消息呢?听起来不就是HTTP作的事情!即便这样,STOMP 仍然为这种一次性的响应提供了支持,用的是@SubscribeMapping注解,与HTTP不一样的是,这种请求-响应模式是异步的...异步
@SubscribeMapping("/getShout") public Shout getShout(){ Shout shout = new Shout(); shout.setMessage("Hello STOMP"); return shout; }
正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解能够处理客户端发送过来的消息,并选择方法是否有返回值。
若是 @MessageMapping 注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可使用@SendTo 重写消息目的地;
若是 @SubscribeMapping 注解的控制器方法有返回值的话,返回值会直接发送到客户端,不通过代理。若是加上@SendTo 注解的话,则要通过消息代理。
spring-websocket 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),能够实现自由的向任意目的地发送消息,而且订阅此目的地的全部用户都能收到消息。
@Autowired private SimpMessageSendingOperations simpMessageSendingOperations; /** * 广播消息,不指定用户,全部订阅此的用户都能收到消息 * @param shout */ @MessageMapping("/broadcastShout") public void broadcast(Shout shout) { simpMessageSendingOperations.convertAndSend("/topic/shouts", shout); }
3.2介绍了如何广播消息,订阅目的地的全部用户都能收到消息。若是消息只想发送给特定的用户呢?spring-websocket 介绍了两种方式来实现这种功能,一种是 基于@SendToUser注解和Principal参数,一种是SimpMessageSendingOperations 接口的convertAndSendToUser方法。
@SendToUser 表示要将消息发送给指定的用户,会自动在消息目的地前补上"/user"前缀。以下,最后消息会被发布在 /user/queue/notifications-username。可是问题来了,这个username是怎么来的呢?就是经过 principal 参数来得到的。那么,principal 参数又是怎么来的呢?须要在spring-websocket 的配置类中重写 configureClientInboundChannel 方法,添加上用户的认证。
/** * 一、设置拦截器 * 二、首次链接的时候,获取其Header信息,利用Header里面的信息进行权限认证 * 三、经过认证的用户,使用 accessor.setUser(user); 方法,将登录信息绑定在该 StompHeaderAccessor 上,在Controller方法上能够获取 StompHeaderAccessor 的相关信息 * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //一、判断是否首次链接 if (StompCommand.CONNECT.equals(accessor.getCommand())){ //二、判断用户名和密码 String username = accessor.getNativeHeader("username").get(0); String password = accessor.getNativeHeader("password").get(0); if ("admin".equals(username) && "admin".equals(password)){ Principal principal = new Principal() { @Override public String getName() { return userName; } }; accessor.setUser(principal); return message; }else { return null; } } //不是首次链接,已经登录成功 return message; } }); }
@MessageMapping("/shout") @SendToUser("/queue/notifications") public Shout userStomp(Principal principal, Shout shout) { String name = principal.getName(); String message = shout.getMessage(); LOGGER.info("认证的名字是:{},收到的消息是:{}", name, message); return shout; }
除了convertAndSend()之外,SimpMessageSendingOperations 还提供了convertAndSendToUser()方法。按照名字就能够判断出来,convertAndSendToUser()方法可以让咱们给特定用户发送消息。
@MessageMapping("/singleShout") public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) { String message = shout.getMessage(); LOGGER.info("接收到消息:" + message); Principal user = stompHeaderAccessor.getUser(); simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout); }
如上,这里虽然我仍是用了认证的信息获得用户名。可是,其实大可没必要这样,由于 convertAndSendToUser 方法能够指定要发送给哪一个用户。也就是说,彻底能够把用户名的看成一个参数传递给控制器方法,从而绕过身份认证!convertAndSendToUser 方法最终会把消息发送到 /user/sername/queue/shouts 目的地上。
在处理消息的时候,有可能会出错并抛出异常。由于STOMP消息异步的特色,发送者可能永远也不会知道出现了错误。@MessageExceptionHandler标注的方法可以处理消息方法中所抛出的异常。咱们能够把错误发送给用户特定的目的地上,而后用户从该目的地上订阅消息,从而用户就能知道本身出现了什么错误啦...
@MessageExceptionHandler(Exception.class) @SendToUser("/queue/errors") public Exception handleExceptions(Exception t){ t.printStackTrace(); return t; }
STOMP 依赖 sockjs.js 和 stomp.min.js。stomp.min.js的下载连接:http://www.bootcdn.cn/stomp.js/
<script type="text/javascript" src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script> <script type="text/javascript" src="/js/stomp.min.js"></script>
/*STOMP*/ var url = 'http://localhost:8080/stomp'; var sock = new SockJS(url); var stomp = Stomp.over(sock); var strJson = JSON.stringify({'message': 'Marco!'}); //默认的和STOMP端点链接 /*stomp.connect("guest", "guest", function (franme) { });*/ var headers={ username:'admin', password:'admin' }; stomp.connect(headers, function (frame) { //发送消息 //第二个参数是一个头信息的Map,它会包含在STOMP的帧中 //事务支持 var tx = stomp.begin(); stomp.send("/app/marco", {transaction: tx.id}, strJson); tx.commit(); //订阅服务端消息 subscribe(destination url, callback[, headers]) stomp.subscribe("/topic/marco", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("订阅的服务端消息:" + obj.message); }, {}); stomp.subscribe("/app/getShout", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("订阅的服务端直接返回的消息:" + obj.message); }, {}); /*如下是针对特定用户的订阅*/ var adminJSON = JSON.stringify({'message': 'ADMIN'}); /*第一种*/ stomp.send("/app/singleShout", {}, adminJSON); stomp.subscribe("/user/queue/shouts",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用户特定的消息1:" + obj.message); }); /*第二种*/ stomp.send("/app/shout", {}, adminJSON); stomp.subscribe("/user/queue/notifications",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用户特定的消息2:" + obj.message); }); /*订阅异常消息*/ stomp.subscribe("/user/queue/errors", function (message) { console.log(message.body); }); //若使用STOMP 1.1 版本,默认开启了心跳检测机制(默认值都是10000ms) stomp.heartbeat.outgoing = 20000; stomp.heartbeat.incoming = 0; //客户端不从服务端接收心跳包 });