最近公司的客户要求,分配给员工的任务除了有微信通知外,还但愿PC端的网页也能实时收到通知。管理员分配任务是在咱们的系统A,而员工接受任务是在系统B。两个系统都是如今已投入使用的系统。html
根据需求咱们最终选用SpringAOP+RabbitMQ+WebSocket。前端
SpringAOP可让咱们不修改原有代码,直接将原有service做为切点,加入切面。RabbitMQ可让A系统和B系统解耦。WebSocket则能够达到实时通知的要求。java
AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,好比日志,事务,权限等待。是Spring的核心模块,底层是经过动态代理来实现(动态代理将在以后的文章重点介绍)。web
基本概念spring
Aspect(切面):一般是一个类,里面能够定义切入点和通知。编程
JointPoint(链接点):程序执行过程当中明确的点,通常是方法的调用。浏览器
Advice(通知):AOP在特定的切入点上执行的加强处理,有before,after,afterReturning,afterThrowing,around。服务器
Pointcut(切入点):就是带有通知的链接点,在程序中主要体现为书写切入点表达式。微信
通知类型websocket
Before:在目标方法被调用以前作加强处理。
@Before只须要指定切入点表达式便可
AfterReturning:在目标方法正常完成后作加强。
@AfterReturning除了指定切入点表达式后,还能够指定一个返回值形参名returning,表明目标方法的返回值
AfterThrowing:主要用来处理程序中未处理的异常。
@AfterThrowing除了指定切入点表达式后,还能够指定一个throwing的返回值形参名,能够经过该形参名
来访问目标方法中所抛出的异常对象
After:在目标方法完成以后作加强,不管目标方法时候成功完成。
@After能够指定一个切入点表达式
Around:环绕通知,在目标方法完成先后作加强处理,环绕通知是最重要的通知类型,像事务,日志等都是环绕通知,注意编程中核心是一个ProceedingJoinPoint。
(图摘自:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html)
从图中咱们能够看到RabbitMQ主要的结构有:Routing、Binding、Exchange、Queue。
Queue
Queue(队列)RabbitMQ的做用是存储消息,队列的特性是先进先出。
Exchange
生产者产生的消息并非直接发送给消息队列Queue的,而是要通过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,还会将不符合路由规则的消息丢弃。
Routing
用于标记或生产者寻找Exchange。
Binding
用于Exchange和Queue作关联。
Exchange Type
fanout
fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中。
direct
direct会把消息路由到那些binding key与routing key彻底匹配的Queue中。
topic
direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,能够经过通配符知足一部分规则就能够传送。
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
了解websocket必须先知道几个经常使用的web通讯技术及其区别。
短轮询
短轮询的基本思路就是浏览器每隔一段时间向浏览器发送http请求,服务器端在收到请求后,不管是否有数据更新,都直接进行响应。这种方式实现的即时通讯,本质上仍是浏览器发送请求,服务器接受请求的一个过程,经过让客户端不断的进行请求,使得客户端可以模拟实时地收到服务器端的数据的变化。
这种方式的优势是比较简单,易于理解,实现起来也没有什么技术难点。缺点是显而易见的,这种方式因为须要不断的创建http链接,严重浪费了服务器端和客户端的资源。尤为是在客户端,距离来讲,若是有数量级想对比较大的人同时位于基于短轮询的应用中,那么每个用户的客户端都会疯狂的向服务器端发送http请求,并且不会间断。人数越多,服务器端压力越大,这是很不合理的。
所以短轮询不适用于那些同时在线用户数量比较大,而且很注重性能的Web应用。
长轮询/comet
comet指的是,当服务器收到客户端发来的请求后,不会直接进行响应,而是先将这个请求挂起,而后判断服务器端数据是否有更新。若是有更新,则进行响应,若是一直没有数据,则到达必定的时间限制(服务器端设置)后关闭链接。
长轮询和短轮询比起来,明显减小了不少没必要要的http请求次数,相比之下节约了资源。长轮询的缺点在于,链接挂起也会致使资源的浪费。
SSE
SSE是HTML5新增的功能,全称为Server-Sent Events。它能够容许服务推送数据到客户端。SSE在本质上就与以前的长轮询、短轮询不一样,虽然都是基于http协议的,可是轮询须要客户端先发送请求。而SSE最大的特色就是不须要客户端发送请求,能够实现只要服务器端数据有更新,就能够立刻发送到客户端。
SSE的优点很明显,它不须要创建或保持大量的客户端发往服务器端的请求,节约了不少资源,提高应用性能。而且SSE的实现很是简单,不须要依赖其余插件。
WebSocket
WebSocket是Html5定义的一个新协议,与传统的http协议不一样,该协议能够实现服务器与客户端之间全双工通讯。简单来讲,首先须要在客户端和服务器端创建起一个链接,这部分须要http。链接一旦创建,客户端和服务器端就处于平等的地位,能够相互发送数据,不存在请求和响应的区别。
WebSocket的优势是实现了双向通讯,缺点是服务器端的逻辑很是复杂。如今针对不一样的后台语言有不一样的插件可使用。
四种Web即时通讯技术比较
从兼容性角度考虑,短轮询>长轮询>长链接SSE>WebSocket;
从性能方面考虑,WebSocket>长链接SSE>长轮询>短轮询。
项目使用SpringBoot搭建。RabbitMQ的安装这里不讲述。
两个系统A、B都须要操做RabbitMQ,其中A生产消息,B消费消息。故都须要配置。
一、首先引入RabbitMQ的dependency:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
这个dependency中包含了RabbitMQ相关dependency。
二、在项目的配置文件里配置为使用rabbitmq及其参数。
application-pro.yml
#消息队列 message.queue.type: rabbitmq ## rabbit mq properties rabbitmq: host: localhost port: 5672 username: guest password: guest
application.properties
#将要使用的队列名 rabbitmq.websocket.msg.queue=websocket_msg_queue
三、建立配置文件。队列的建立交给spring。
RabbitMQConfig.java
@Configuration @EnableRabbit public class RabbitMQConfig { @Value("${rabbitmq.host}") private String host; @Value("${rabbitmq.port}") private String port; @Value("${rabbitmq.username}") private String username; @Value("${rabbitmq.password}") private String password; @Value("${rabbitmq.websocket.msg.queue}") private String webSocketMsgQueue; @Bean public ConnectionFactory connectionFactory() throws IOException { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUsername(username); factory.setPassword(password); // factory.setVirtualHost("test"); factory.setHost(host); factory.setPort(Integer.valueOf(port)); factory.setPublisherConfirms(true); //设置队列参数,是否持久化、队列TTL、队列消息TTL等 factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null); return factory; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() throws IOException { return new RabbitTemplate(connectionFactory()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
四、系统B中建立队列监听,当队列有消息时,发送websocket通知。
RabbitMQListener.java
@Component public class RabbitMQListener { @Autowired private RabbitMQService mqService; /** * WebSocket推送监听器 * @param socketEntity * @param deliveryTag * @param channel */ @RabbitListener(queues = "websocket_msg_queue") public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel); } }
RabbitMQService.java
public class RabbitMQService { @Autowired private MessageWebSocketHandler messageWebSocketHandler; /** * @param socketMsgEntity * @param deliveryTag * @param channel * @throws IOException */ void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException { try { messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds()); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); } } }
WebSocketMsgEntity为MQ中传送的实体。
public class WebSocketMsgEntity implements Serializable { public enum OrderType{ repair("维修"), maintain("保养"), measure("计量"); OrderType(String value){ this.value = value; } String value; public String getValue() { return value; } } //设备名称 private String EquName; //设备编号 private String EquId; //工单类型 private OrderType orderType; //工单单号 private String orderId; //工单状态 private String orderStatus; //建立时间 private Date createTime; //消息接收人ID private List<String> toUserIds; public String getEquName() { return EquName; } public void setEquName(String equName) { EquName = equName; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getEquId() { return EquId; } public void setEquId(String equId) { EquId = equId; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } public OrderType getOrderType() { return orderType; } public void setOrderType(OrderType orderType) { this.orderType = orderType; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public List<String> getToUserIds() { return toUserIds; } public void setToUserIds(List<String> toUserIds) { this.toUserIds = toUserIds; } public String toJsonString(){ return JSON.toJSONString(this); } }
一、系统A中建立一个切面类DataInterceptor.java
@Aspect @Component public class DataInterceptor { @Autowired private MessageQueueService queueService; //维修工单切点 @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))") private void repairMsg() { } /** * 返回通知,方法执行正常返回时触发 * * @param joinPoint * @param result */ @AfterReturning(value = "repairMsg()", returning = "result") public void afterReturning(JoinPoint joinPoint, Object result) { //此处能够得到切点方法名 //String methodName = joinPoint.getSignature().getName(); EquipmentRepair equipmentRepair = (EquipmentRepair) result; WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair); if (webSocketMsgEntity == null) { return; } queueService.send(webSocketMsgEntity); } /** * 生成发送到MQ的维修消息 * * @param equipmentRepair * @return */ private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) { WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair); return webSocketMsgEntity; } /** * 从任务中生成消息 * * @param equipmentRepair * @return */ private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) { //业务代码略 } }
二、发送消息到MQ。这里只贴了发送的核心代码
public class RabbitMessageQueue extends AbstractMessageQueue { @Value("${rabbitmq.websocket.msg.queue}") private String webSocketMsgQueue; @Autowired private RabbitTemplate rabbitTemplate; @Override public void send(WebSocketMsgEntity entity) { //没有指定exchange,则使用默认名为“”的exchange,binding名与queue名相同 rabbitTemplate.convertAndSend(webSocketMsgQueue, entity); } }
一、 系统B中引入websocket服务端dependency
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.3.10.RELEASE</version> </dependency>
二、 配置websocket,添加处理类
WebSocketConfigurer.java
@Configuration @EnableWebSocket public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer { private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class); @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { //配置webSocket路径 registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*"); //配置webSocket路径 支持前端使用socketJs registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS(); } @Bean public MessageWebSocketHandler messageWebSocketHandler() { logger.info("......建立MessageWebSocketHandler......"); return new MessageWebSocketHandler(); } }
MessageWebSocketHandler.java 主要用于websocket链接及消息发送处理。配置中还使用了链接握手时的处理,主要是取用户登录信息,这里很少讲述。
public class MessageWebSocketHandler extends TextWebSocketHandler { private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class); private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketSession>> users = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userId = session.getAttributes().get("WEBSOCKET_USERID").toString(); logger.info("......AfterConnectionEstablished......"); logger.info("session.getId:" + session.getId()); logger.info("session.getLocalAddress:" + session.getLocalAddress().toString()); logger.info("userId:" + userId); //websocket链接后记录链接信息 if (users.keySet().contains(userId)) { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); webSocketSessions.add(session); } else { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>(); webSocketSessions.add(session); users.put(userId, webSocketSessions); } } @Override public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception { removeUserSession(session); if (session.isOpen()) { session.close(); } logger.info("异常出现handleTransportError" + throwable.getMessage()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { removeUserSession(session); logger.info("关闭afterConnectionClosed" + closeStatus.getReason()); } @Override public boolean supportsPartialMessages() { return false; } /** * 给符合要求的在线用户发送消息 * * @param message */ public void sendMessageToUsers(String message, List<String> userIds) throws IOException{ if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) { return; } if (users.isEmpty()) { return; } for (String userId : userIds) { if (!users.keySet().contains(userId)) { continue; } CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); if (webSocketSessions == null) { continue; } for (WebSocketSession webSocketSession : webSocketSessions) { if (webSocketSession.isOpen()) { try { webSocketSession.sendMessage(new TextMessage(message)); } catch (IOException e) { logger.error(" WebSocket server send message ERROR " + e.getMessage()); try { throw e; } catch (IOException e1) { e1.printStackTrace(); } } } } } } /** * websocket清除链接信息 * * @param session */ private void removeUserSession(WebSocketSession session) { String userId = session.getAttributes().get("WEBSOCKET_USERID").toString(); if (users.keySet().contains(userId)) { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); webSocketSessions.remove(session); if (webSocketSessions.isEmpty()) { users.remove(userId); } } } }
整个功能完成后,A系统分配任务时,系统B登录用户收到的消息如图:
整体流程:
一、对于系统B,每一个登录的用户都会和服务器创建websocket长链接。
二、系统A生成任务,AOP作出响应,将封装的消息发送给MQ。
三、系统B中的MQ监听发现队列有消息到达,消费消息。
四、系统B经过websocket长链接将消息发给指定的登录用户。
参考:
https://docs.spring.io/spring/docs/4.3.12.RELEASE/spring-framework-reference/htmlsingle/#websocket
http://www.javashuo.com/article/p-nmcskqka-be.html