[TOC]java
消息没有任何消费者去消费就变为死信ide
利用DLX,当消息在一个队列中变成死信以后,它能被从新publish到另一个exchange,这个exchange就是DLX。ui
DLX也是一个正常的Exchange,和通常的Exchange没有区别,它能在任何队列上被指定,实际就是设置某个队列的属性。当队列中有死信时,RabbitMQ会自动将死信消息发送到设置的DLX,进而被路由到另一个队列,能够监听这个队列,作后续处理。spa
arguments.put(x-dead-letter-exchange", "you dlx");
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消费者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); String msg = "正常消息1,routingKey:" + routingKey; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build(); channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8")); // 该消息无消费者消费 String msg2 = "过时死信消息2,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg2.getBytes("UTF-8")); String msg3 = "过时死信消息3,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg3.getBytes("UTF-8")); String msg4 = "过时死信消息4,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg4.getBytes("UTF-8")); channel.close(); connection.close(); } }
producer能够采用消息过时产生死信code
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消费者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 定义死信队的Exchange String dlxExchange = "dlx.exchange"; channel.exchangeDeclare(dlxExchange, "topic"); // 死信队列名 String dlxQueue = "dlx.queue"; channel.queueDeclare(dlxQueue, true, false, false, null); // # 表示全部的key均可以路由到s死信队列 String dlxRoutingKey = "#"; // 绑定死信队列和exchange channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null); // 定义正常的消费者j监听队列 String queueName = "test_dlx_queue"; String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明队列 Map<String, Object> arguments = new HashMap<>(); // 设置死信队列,arguments要设置到申明的队列上 arguments.put("x-dead-letter-exchange", dlxExchange); channel.queueDeclare(queueName, true, false, false, arguments); // 队列绑定到exchange channel.queueBind(queueName, exchangeName, routingKey); channel.basicQos(1); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---消费者-- " + new String(message.getBody(), "UTF-8")); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消费者--:cancelCallback "); } }; // 消费消息,autoAck必定要设为false,手工ack channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
运行结果:只消费一条正常消息,其余过时的未消费blog
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 监听私信队列 * * @author wyg0405@gmail.com * @date 2019-11-22 21:52 * @since JDK1.8 * @version V1.0 */ public class DLXConusmer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "dlx.queue"; String exchangeName = "dlx.exchange"; String routingKey = "#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明队列 channel.queueDeclare(queueName, true, false, false, null); // 队列绑定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("---死信队列消费者---"); System.out.println(new String(message.getBody(), "UTF-8")); } finally { // consumer手动 ack 给broker channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消费者:cancelCallback"); } }; // 消费消息,autoAck必定要设置为false channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
运行结果:3条过时的消息进入死信队列,并被消费rabbitmq