RabbitMQ能够设置队列的优先级,在队列中的高优先级消息会被优先消费。在设置优先级时,首先须要设置队列的最高优先级,而后在生产者发送消息时设置该条消息的优先级,最后在队列中的高优先级的消息会被先发送给消费者消费java
设置队列的最高优先级在声明队列时进行设置,代码以下:设计模式
Map<String, Object> queueArgs = new HashMap<>(1); queueArgs.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
设置消息的优先级在生产者生成消息时进行设置,代码以下:服务器
BasicProperties properties = new BasicProperties.Builder() .priority(i) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message);
注意:当消费者消费速度大于生产端,且Broker中没有消息堆积的话,也就是说当生产者生产一条消息就被消费者消费,消息队列中没有消息堆积的话,设置消息优先级是没有意义的ide
生产者ui
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); for (int i = 0; i < 10; i++) { BasicProperties properties = new BasicProperties.Builder() .priority(i) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, String.valueOf(i).getBytes(StandardCharsets.UTF_8)); }
消费者.net
Channel channel = connection.createChannel(); channel.exchangeDeclare(PriorityProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> queueArgs = new HashMap<>(1); queueArgs.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs); channel.queueBind(QUEUE_NAME, PriorityProducer.EXCHANGE_NAME, PriorityProducer.ROUTING_KEY); channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.print(message + " "); channel.basicAck(envelope.getDeliveryTag(), false); } });
输出以下: 0 9 8 7 6 5 4 3 2 1设计
因为消费者设置了消息预取数量为1,因此会先取0消费,而后形成消息在消息队列中的积压,后面取的话就会先取优先级高的消息code
RabbitMQ使用AMQP协议,在AMQP协议中没有直接实现延迟消息,因此咱们使用死信队列(DLX)和消息存活时间(TTL)模拟出延迟队列blog
当消息在队列中变为死信消息(Dead Message)后,该消息会被Publish到该队列的DLX(Dead-Letter-Exchange)中。DLX就是一个Exchange,当消息被发送到DLX后能够路由到队列中进行从新消费接口
消息在消息队列中变为死信消息的几种状况:
在声明队列时设置该队列的死信队列以及发送消息到死信队列的Routing Key,代码以下:
Map<String, Object> queueArgs = new HashMap<>(2); // 设置死信队列 queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME); // 设置死信Roting Key,不设置默认使用该Queue的Routing Key queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY); channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);
能够经过DDL和DLX实现延迟队列,具体实现逻辑以下:
把消息发送到普通的队列中(该队列设置死信队列),当消息DDL到期后会发送到死信队列中,而后经过消费死信队列中的消息实现延迟队列,示例代码以下:
生产者
Channel channel = connection.createChannel(); channel.exchangeDeclare(PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); BasicProperties properties = new BasicProperties.Builder() // 设置消息的TTL .expiration("60000") .build(); channel.basicPublish(PLAIN_EXCHANGE_NAME, PLAIN_ROUTING_KEY, properties, "Hello".getBytes(StandardCharsets.UTF_8));
消费者
Channel channel = connection.createChannel(); channel.exchangeDeclare(DelayProducer.PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DelayProducer.DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> queueArgs = new HashMap<>(2); // 设置死信队列 queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME); // 设置死信Roting Key,不设置默认使用该Queue的Routing Key queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY); channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs); channel.queueBind(PLAIN_QUEUE_NAME, DelayProducer.PLAIN_EXCHANGE_NAME, DelayProducer.PLAIN_ROUTING_KEY); channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); channel.queueBind(DLX_QUEUE_NAME, DelayProducer.DLX_EXCHANGE_NAME, DelayProducer.DLX_ROUTING_KEY); // 因为消息到普通队列中TTL时间内没有消费,因此该消息会被发送到死信队列中,因此咱们经过消费死信队列来实现延迟消息 channel.basicConsume(DLX_QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(message); channel.basicAck(envelope.getDeliveryTag(), false); } });
由前面的消息队列系列文章能够看出来,消费者能够获取消息有Pull、Push模型。RabbitMQ两种模型都支持,可是其对Pull模型支持不太好,须要本身实现轮询查询是否有消息。下面是两种模型的简单使用
Push模型是RabbitMQ服务器主动推送消息给Consumer。这种模型有点像设计模式中的时间驱动模式,须要Consumer注册回调接口到RabbitMQ服务器中,当RabbitMQ服务器有消息时会主动回调接口发送消息。Push模型有慢消费的缺点,RabbitMQ经过设置消费者预取消息数量来控制服务器发送消息的速度。咱们常常用到就是这种模式,Consumer示例代码以下:
Channel channel = connection.createChannel(); channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY); channel.basicConsumer(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // 处理消息逻辑 channel.basicAck(envelope.getDeliveryTag(), false); } });
Pull模型Consumer主动去RabbitMQ服务器拉消息。这种模式的缺点是消息延迟和忙等,须要本身设计轮询方案。Consumer示例代码以下,没有实现轮询方案:
Connection connection = Basic.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY); while (true) { GetResponse response = channel.basicGet(QUEUE_NAME, false); if (response == null) { continue; } String message = new String(response.getBody(), StandardCharsets.UTF_8); // 处理消息逻辑 channel.basicAck(response.getEnvelope().getDeliveryTag(), false); }
http://blog.csdn.net/u013256816/article/details/55105495 http://blog.csdn.net/u013256816/article/details/62890189