rabbitmq+websocket(SpringBoot版)实现分布式消息推送

原本想用websocket作一个消息推送 但是分布式环境下不支持session共享由于服务器不一样html

因此采用 rabbitMQ+webSocket实现分布式消息推送前端

生产者将消息 发送给 rabbitMQ 的 virtual-host:/(顶极路由) 再由它路由到交换机 最终由交换机经过路由键指定具体的管道html5

消费者监听指定的管道获取消息web

最终将获取的消息 交给 webSocket 被@OnMessage注解标识的方法spring

每次消费一条消息交给 被@OnMessage注解标识的方法 返回给前台 json

实现分布式实时推送浏览器

1.配置rabbitMQ安全

消息生产者服务器

1.1pom.xmlwebsocket

1 <!--引入rabbitmq依赖-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>
 1 server:  2  port: 5002  3 
 4 spring:  5  rabbitmq:  6  host: localhost  7  #帐号密码 默认有的  8  username: guest  9  password: guest 10  #rbbitmq虚拟主机路径 11  virtual-host: / 12  #rabbitmq的端口号 也是默认的 13     port: 5672
 1 @SpringBootApplication  2 @MapperScan(basePackages = "com.supplychain.dao")  3 @EnableRabbit/**开启rabbitmq*/
 4 public class ThumbsupServer5002_App {  5 
 6     public static void main(String[]args){  7 
 8         SpringApplication.run(ThumbsupServer5002_App.class,args);  9 
10  } 11 
12     /**消息的转换器 13  * 设置成json 并放入到Spring中 14  * */
15  @Bean 16     public MessageConverter messageConverter(){ 17 
18         return new Jackson2JsonMessageConverter(); 19 
20  } 21 }

测试发送消息

 1 @RunWith(SpringRunner.class)  2 @SpringBootTest  3 public class ThumbsupServer5002_AppTest {  4 
 5 
 6  @Autowired  7     private RabbitTemplate rabbitTemplate;  8 
 9  @Test 10     public void contextLoads() { 11 
12         UserTest userTest = new UserTest("hao", "651238730@qq.com"); 13 
14         /**1.指定发送的交换机 15  * 发送的消息会先发送给 virtual-host: /(顶级路由) 再由它到交换机 16  * 由交换机经过路由键指定给具体的管道 17  * 18  * 2.路由键 19  * 有的交换机须要路由键 有的不须要(发送给交换机的消息会被发送给全部管道) 20  * 21  * 3.发送的消息 22  * 若是是对象的话必须实现序列化接口由于网络传输只能传二进制 23  * 24  * */
25         rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest); 26  } 27 
28 }

2.消息消费者

一样是pom.xml须要引入rabbitMQ依赖

1 <!--引入rabbitmq依赖-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>

一样须要配置application.yml

 1 spring:  2  rabbitmq:  3     host: 127.0.0.1
 4  #帐号密码 默认有的  5  username: guest  6  password: guest  7  #rbbitmq虚拟主机路径  8     virtual-host: /
 9  #rabbitmq的端口号 也是默认的 10     port: 5672
11  listener: 12  simple: 13         acknowledge-mode: manual #手动接受数据 14         #max-concurrency: 10 #最大并发 15         #prefetch: 1 #限流

一样主启动类中须要开启RabbitMQ

 1 @SpringBootApplication  2 @EnableRabbit  3 public class MessageServer5003_App {  4 
 5     public static void main(String[]args){  6 
 7         SpringApplication.run(MessageServer5003_App.class,args);  8 
 9  } 10 
11     /**这里也须要设置消息转换类型 12  * 和发送的消息类型必定要对应 13  * 否则对象接受json启动主程序类时就会报错 14  * */
15  @Bean 16     public MessageConverter messageConverter(){ 17 
18         return new Jackson2JsonMessageConverter(); 19 
20  } 21 
22 }

下面到了整合的环节了

 1 @ServerEndpoint(value = "/websocket")  2 @Component  3 public class WebSocketServer {  4 
 5     //静态变量 用于记录当前在线链接数 应该把它设计成线程安全
 6     private static int onlineCount=0;  7 
 8     /**Concurrent包下的 写时复制Set 用它做于存储客户端对应的MyWebSocket对象*/
 9     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>();  10 
 11 
 12     /**与某个客户端的连接会话,须要经过它来给客户端发送数据*/
 13 
 14     private Session session;  15     /**
 16  * 参数1:Message 能够得到消息的内容字节 还能够得到消息的其余属性  17  * 参数2:能够写肯定接受的参数类型好比User  18  * 参数3:Channel 通道  19  * com.rabbitmq.client.Channel必须是这个包下  20  * 经过这个参数能够拒绝消息  21  * 让rabbitmq再发给别的消费者  22  *  23  * 使用@RabbitListener 能够绑定交换机 路由键 管道  24  *  25      */
 26     @RabbitListener(bindings = @QueueBinding(  27          value = @Queue(value = "userTest-queue",durable = "true"),  28          exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),  29          key = "userTest-key"
 30  )  31  )  32     @RabbitHandler//注解意思:若是有消息过来 须要消费的时候才会调用该方法
 33     /**若是已知传递的参数是 UserTest对象能够经过该注解  34  * 消息头须要用map接受  35  * 既然是手动接受消息 就须要设置channel  36  * */
 37     public void receiveUserMessage(@Payload UserTest userTest, @Headers Map<String,Object> headers, Channel channel) throws IOException {  38         //sendMessage(message.toString());
 39         System.out.println("UserTest对象"+userTest);  40  onMessage(userTest.toString());//调用消息方法将数据船体给他  41 
 42         Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG);  43         //手动接受并告诉rabbitmq消息已经接受了 deliverTag记录接受消息 false不批量接受
 44         channel.basicAck(deliveryTag,false);  45 
 46         /**
 47  * basicReject()  48  * 参数1: 消息标签  49  * 参数2: true 将消息重新放入队列 false 接受到并将消息抛弃  50  *  51  *  52  try {  53  channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);  54  System.out.println(message);  55  } catch (IOException e) {  56  e.printStackTrace();  57  }  58      */
 59 
 60  }  61 
 62     /**服务器端推送消息*/
 63     public void sendMessage(String message){  64         try {  65             System.out.println("session能否显示出来"+session);  66             this.session.getBasicRemote().sendText(message);  67         } catch (IOException e) {  68  e.printStackTrace();  69  }  70  }  71 
 72     /**
 73  * 链接创建成功调用的方法  74  * */
 75  @OnOpen  76     public void onOpen(Session session){  77         this.session=session;  78         webSocketSet.add(this);  79         System.out.println("有新的链接加入!当前在线人数为"+getOnlineCount());  80  System.out.println(session);  81  }  82 
 83     /**
 84  * 链接关闭调用的方法  85  * */
 86  @OnClose  87     public void onClose(){  88         /**从安全Set中 移除当前链接对象*/
 89         webSocketSet.remove(this);  90  subOnlineCount();  91         System.out.println("有一链接关闭!当前在线人数为"+getOnlineCount());  92  }  93 
 94 
 95 
 96  @OnMessage  97     public void onMessage(String message){  98 
 99         System.out.println("来自客户端的消息:"+message); 100 
101         for (WebSocketServer webSocketServer:webSocketSet){ 102  webSocketServer.sendMessage(message); 103  } 104 
105  } 106 
107 
108     public static int getOnlineCount() { 109         return onlineCount; 110  } 111 
112     public static synchronized void addOnlineCount() { 113         WebSocketServer.onlineCount++; 114  } 115 
116     public static synchronized void subOnlineCount() { 117         WebSocketServer.onlineCount--; 118  } 119 
120 
121 
122 }

websocket前端

websocket是html5提出的协议属于双工通讯 前端发送一次请求告诉服务器须要将http协议升级成tcp长链接

后面服务端直接给前端推送消息就能够了 从之前的一次请求一次响应 服务端被动式 变成 一次请求服务端能够无限响应

 1 <script>
 2         var socket;  3  console.log(typeof socket)  4         if (typeof(WebSocket)=="undefined"){  5  alert("您的浏览器不支持WebSocket");  6  }else{  7  alert("您的浏览器支持WebSocket");  8 
 9  socket=new WebSocket("ws://localhost:5003/websocket"); 10 
11  socket.onopen=function () { 12  console.log("Socket 已打开"); 13  }; 14 
15             //得到消息事件
16  socket.onmessage = function(msg) { 17  console.log(msg.data); 18                 //发现消息进入 调后台获取
19                 //getCallingList();
20  }; 21 
22             //关闭事件
23  socket.onclose = function() { 24  console.log("Socket已关闭"); 25  }; 26             //发生了错误事件
27  socket.onerror = function() { 28  alert("Socket发生了错误"); 29  }; 30             /** 31  $(window).unload(function(){ 32  socket.close(); 33  }); 34              */
35  } 36     </script>
相关文章
相关标签/搜索