当一条消息在队列中出现如下三种状况的时候,该消息就会变成一条死信。java
当消息在一个队列中变成一个死信以后,若是配置了死信队列,它将被从新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。spring
建立一个消费者,绑定消费队列及死信交换机,交换机默认为direct
模型,死信交换机也是,arguments绑定死信交换机和key。(注解支持的具体参数文末会附上)缓存
public class DirectConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消费者1"+message); }
public void publishMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatripDirect","info",message); }
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicNack(deliverTag,false,false);
@RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "javatripDead"), exchange = @Exchange(value = "deadExchange"), key = "deadKey" ) }) public void receive2(String message){ System.out.println("我是一条死信:"+message); }
绑定业务队列的时候,增长消息的过时时长,当消息过时后,消息将被转发到死信队列中。app
@RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey"), @Argument(name = "x-message-ttl",value = "3000") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消费者1"+message); }
设置消息队列长度,当队列中的消息达到最大长度后,继续发送消息,消息将被转发到死信队列中。ide
@RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey"), @Argument(name = "x-max-length",value = "3") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消费者1"+message); }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface QueueBinding { /** * @return the queue. */ Queue value(); /** * @return the exchange. */ Exchange exchange(); /** * @return the routing key or pattern for the binding. * Multiple elements will result in multiple bindings. */ String[] key() default {}; }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface Queue { /** * @return the queue name or "" for a generated queue name (default). */ @AliasFor("name") String value() default ""; /** * @return the queue name or "" for a generated queue name (default). * @since 2.0 */ @AliasFor("value") String name() default ""; /** * 是否持久化 */ String durable() default ""; /** * 是否独享、排外的. */ String exclusive() default ""; /** * 是否自动删除; */ String autoDelete() default ""; /** * 队列的其余属性参数 * (1)x-message-ttl:消息的过时时间,单位:毫秒; *(2)x-expires:队列过时时间,队列在多长时间未被访问将被删除,单位:毫秒; *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息; *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息; *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head; *(6)x-dead-letter-exchange:死信交换器名称,过时或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中; *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,若是不设置,则使用消息的原来的路由键值 *(8)x-single-active-consumer:表示队列是不是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其余被忽略,false时消息循环分发给全部消费者(默认false) *(9)x-max-priority:队列要支持的最大优先级数;若是未设置,队列将不支持消息优先级; *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽量多的消息,以减小RAM的使用;若是未设置,队列将保留内存缓存以尽量快地传递消息; *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。 */ Argument[] arguments() default {}; }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface Exchange { String TRUE = "true"; String FALSE = "false"; /** * @return the exchange name. */ @AliasFor("name") String value() default ""; /** * @return the exchange name. * @since 2.0 */ @AliasFor("value") String name() default ""; /** * 交换机类型,默认DIRECT */ String type() default ExchangeTypes.DIRECT; /** * 是否持久化 */ String durable() default TRUE; /** * 是否自动删除 */ String autoDelete() default FALSE; /** * @return the arguments to apply when declaring this exchange. * @since 1.6 */ Argument[] arguments() default {}; }