以前咱们给咱们的系统加了一个使用SpringAOP+RabbitMQ+WebSocket进行实时消息通知功能(http://www.javashuo.com/article/p-tzzbgrle-be.html)。在测试环境下没有问题,但上到生产环境后部分用户反映出现了丢消息的状况,针对这个问题咱们进行了排查,发现,本来咱们的系统是单机的,但用户在以前作了调整,在内外网服务器分别部署了系统,两个服务器都公用一个RabbitMQ。那么问题就来了。html
以前生产者向Exchange生产消息,消费者从queue消费消息使用的是direct模式,经过routing-key保证生产的消息只有指定的消费者可消费。但当两台服务器共用一个MQ时即有了两个消费者链接同一个queue,此时rabbitmq并非一个消息两个消费者都能消费,而是采用默认的轮询发送方式,A服务器收到消息一、三、5 。。。而B服务器收到二、四、6 。。。java
这就出现了用户感受的丢消息现象。因此咱们考虑将通知改成广播形式即fanout。web
RabbitMQ将消息中间件的实现分红了Exchange+Queue的形式,Exchange和Queue使用Binding;生产者向Exchange生产消息,消息根据指定的binding进入Queue,消费者从Queue取消息。Fanout模式如图:服务器
一个消费者会对应一个queue,那么多个消费者要有多个queue。websocket
修改后代码以下:socket
RabbitMQConfig.javaide
//声明exchange Connection connection = factory.createConnection(); Channel channel = connection.createChannel(false); //生产环境中有多个server,每一个server都是一个消费者,对同一个消息都要进行处理。选用广播模式 channel.exchangeDeclare("exchange.websocket.msg", BuiltinExchangeType.FANOUT);
RabbitMessageQueue.java测试
@Override public void send(WebSocketMsgEntity entity) { logger.warn("::product msg to MQ-websocket_msg_queue!"); //若没有指定exchange,则使用默认名为“”的exchange,binding名与queue名相同 rabbitTemplate.convertAndSend("exchange.websocket.msg","", entity); }
RabbitMQListener.javaui
@Component public class RabbitMQListener { private static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); @Autowired private RabbitMQService mqService; /** * WebSocket推送监听器 * @param socketEntity * @param deliveryTag * @param channel */ @RabbitListener(bindings ={@QueueBinding(value = @Queue(exclusive = "true"), exchange = @Exchange(value = "exchange.websocket.msg", type = ExchangeTypes.FANOUT))}) public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { logger.warn("::consume msg from MQ-websocket_msg_queue!"); mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel); } }
本来的客户端监听是监听Queue,而如今改为监听Binding。并不显式的指定一个Queue,而是将Queue设置成exclusive = true,这样每一个消费者在监听Binding时都会默认建立一个Queue与指定的Exchange绑定,在消费者断开链接后Queue自动删除。如有两个消费者,则建立两个Queue,他们绑定的Exchange相同,当生产者有消息时会向两个Queue各插入一条,那么两个系统的用户就都能收到通知啦!spa