上班时间闲的无聊更一篇帖子,以前公司安排了 一个需求,觉得要用RabbitMQ实现,就本身研究了一下,终于在Giant努力之下,MQ仍是被征服,把消息变成死信的方式有不少,我只实现了其中的一种,(我怎么可能会告诉大家其实我不会其余的呢)有兴趣的话,能够本身去网上研究研究其余的实现方式,好了,咱们直奔主题吧.
html
首先,咱们先要声明一个队列,并给这个队列声明一个转发的交换机,而后指定它的路由,(这么作是让"METHOD_EVENT_DIRECT_QUEUE"中的消息过时而后放入到这个)java
<rabbit:queue id = "METHOD_EVENT_DIRECT_QUEUE" name="METHOD_EVENT_DIRECT_QUEUE" durable="true" auto-delete="false" exclusive="false">
<!--声明转发的交换机,以及转发的路由-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlx.exchange" />
<entry key="x-dead-letter-routing-key" value="routingKey" />
</rabbit:queue-arguments></rabbit:queue>程序员
其次,咱们再声明一个正常队列
spring
<rabbit:queue id="dlq.queue" name="dlq.queue" durable="true" auto-delete="false" exclusive="false"/>
最后,咱们把"METHOD_EVENT_DIRECT_QUEUE" 以及"dlq.queue" 绑定在"dlx.exchange"(转发的交换机),交换机下面,指定dlq.queue(正常队列的路由为转发的路由)路由为"routingKey" ,METHOD_EVENT_DIRECT_QUEUE队列的路由为"METHOD_EVENT_DIRECT_ROUTING_KEY"(本身随便起名字)微信
<!-- 死信队列绑定 -->
<rabbit:direct-exchange name="dlx.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="dlq.queue" key="routingKey"></rabbit:binding>
<rabbit:binding queue="METHOD_EVENT_DIRECT_QUEUE" key="METHOD_EVENT_DIRECT_ROUTING_KEY"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
声明消息接收者
ide
<!-- 消息接收者 -->
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象, 手动确认机制-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="dlq.queue" ref="msgConsumer"/>
</rabbit:listener-container>
这样咱们死信队列的一个配置就算是完成了.post
下面咱们开始声明一个生产者,开始生产消息,消息要设置过时时间(时间由本身指定以秒为单位)
url
/**
* @Description 消息延时发送
*
* @author gzj
* @date 2019年12月17日 上午9:12:06
* @param routingKey 路由键
* @param message 消息体
* @param delayTime 延迟时间 单位秒
* @return MessageResponse
*/
public static void sendDelay(final String routingKey, Object message,String delayTime) {
AmqpTemplate amqpTemplate = SpringContextHolder.getBean("amqpTemplate");
//final int xdelay = delayTime * 1000;
amqpTemplate.convertAndSend("dlx.exchange",routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置延迟时间
message.getMessageProperties().setExpiration(delayTime);
return message;
}
});
}
package com.hmgj.impl.user;
import com.alibaba.dubbo.config.annotation.Service;
import com.hmgj.helper.RabbitHelper;;
import com.hmgj.service.user.ProviderService;
import org.springframework.amqp.core.AmqpTemplate;
import javax.annotation.Resource;
/**
* @author gzj
* @Date 2019/11/28.
*/
@Service
public class ProviderServiceImpl implements ProviderService{
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMsg(String content) {
System.out.println("要发送的消息 :" + content);
RabbitHelper.sendDelay("METHOD_EVENT_DIRECT_ROUTING_KEY","132","6000");
System.out.println("消息发送成功");
}
}
消费者监听正常队"dlq.queue"列,接收消息(注意网上好多都是监听的"METHOD_EVENT_DIRECT_QUEUE"队列,我发现这个写法是有问题的)
spa
package com.hmgj.task.queue;
import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author gzj
* @Date 2019/11/28.
*/
@Component
public class MsgConsumer implements ChannelAwareMessageListener {
private final Logger logger = LoggerFactory.getLogger(MsgConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String context = "";
logger.info("进入队列");
try {
context = new String(message.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
logger.info("message:" + message.toString());
logger.info("接收处理当前监听队列当中的消息:" + context + "\n 当前线程name:" + Thread.currentThread().getName() + "\n 当前线程id:"
+ Thread.currentThread().getId());
// 消息的标识,false只确认当前一个消息收到,true确认全部consumer得到的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// nack返回false,并从新回到队列,就是重发机制,一般存在于消费失败后处理中;
//第三个参数与拒绝消息方法的第二个参数同理。即true从新进入队列,false则丢弃;
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息,即丢弃消息,消息不会从新回到队列,后面的参数为true则重入队列;为false则丢弃;
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
最终实现结果
.net
本文分享自微信公众号 - 程序员真正幽默段子(gh_b9e01d69a484)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。