一、为何要重试?html
若是消费者处理消息失败后不重试,而后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而形成消息的丢失。因此咱们要在消费者处理消息失败的时候,重试必定的次数。好比重试3次,若是重试3次以后仍是失败,则把这条消息发送到死信队列。java
因此咱们如今要实现消息的重试,实现效果为:数据库
首先,将消息携带routtingkey的消息发送到正常转发器exchange@normal,exchange@normal将消息发送到正常队列queue@normal,queue@normal获得消息后进行处理,若是处理成功,则给rabbitmq发送应答。若是消息处理失败,判断消息失败的次数:若是失败次数小于3次,则将消息发送到重试转发器exchange@retry,exchange@retry获得消息后,发送到重试队列queue@retry,queue@retry10s后,将该条消息再次发送到正常转发器exchange@normal进行正常的消费;若是失败次数超过3次,则将消息发送到失败转发器exchange@filed,exchange@filed将失败了的消息发送给失败队列queue@filed,而后能够根据业务需求处理失败了的数据。好比保存到失败文件或者数据库等,也能够人工处理后,从新发送给exchange@normal。app
思路图以下:ide
二、重试实现的思路测试
生产者端发送消息:spa
1)、声明三个转发器code
//正常的转发器 private static String NORMAL_EXCHANGE = "exchange@normal"; //重试转发器 private static String RETRY_EXCHANGE = "exchange@retry"; //失败转发器 private static String FILED_EXCHANGE = "exchange@filed"; ...... //声明正常的转发器 channel.exchangeDeclare(NORMAL_EXCHANGE,"topic"); //声明重试的转发器 channel.exchangeDeclare(RETRY_EXCHANGE,"topic"); //声明失败的转发器 channel.exchangeDeclare(FILED_EXCHANGE,"topic");
2)、发送消息到正常的队列orm
//发送5条消息到正常的转发器,路由密匙为normal for(int i=0;i<5;i++){ String message = "retry..."+i; channel.basicPublish(NORMAL_EXCHANGE,"normal", MessageProperties.PERSISTENT_BASIC,message.getBytes()); }
消费者端接受、处理消息:htm
1)、定义并声明转发器和队列,须要注意的是queue@normal和queue@filed两个队列就是普通的队列。queue@retry队列须要设置两个参数:x-dead-letter-exchange、x-message-ttl。这两个参数按照我本身的理解是x-dead-letter-exchange指定重试时将消息重发给哪个转发器、x-message-ttl消息到达重试队列后,多长时间后重发。
//转发器 private static String NORMAL_EXCHANGE = "exchange@normal"; private static String RETRY_EXCHANGE = "exchange@retry"; private static String FILED_EXCHANGE = "exchange@filed"; //队列 private static String NORMAL_QUEUE = "queue@normal"; private static String RETRY_QUEUE = "queue@retry"; private static String FILED_QUEUE = "queue@filed"; ...... //声明正常队列 channel.queueDeclare(NORMAL_QUEUE,true,false,false,null); //声明重试队列,重试队列比较特殊,须要设置两个参数 Map<String,Object> arg = new HashMap<String,Object>(); //参数1:将消息发送到哪个转发器 arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE); //参数2:多长时间后重发 arg.put("x-message-ttl",10000); channel.queueDeclare(RETRY_QUEUE,true,false,false,arg); //声明失败队列 channel.queueDeclare(FILED_QUEUE,true,false,false,null);
2)、将队列绑定转发器和路由密匙
//将队列绑定转发器和路由密匙 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal"); channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal"); channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal");
3)、定义consumer,消费消息。由于消息设置的是自动应答,因此不须要手动应答。若是你设置了手动应答,则在消息消费成功或者失败后都要应答。
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //此到处理消息 try{ String message = new String(body,"utf-8"); System.out.println("消费者接受到的消息:"+message); //模拟处理消息产生异常 int i = 1/0; }catch(Exception e){ try{ //延迟5s Thread.sleep(5000); //判断失败次数 long retryCount = getRetryCount(properties); if(retryCount>=3){ //若是失败超过三次,则发送到失败队列 channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body); System.out.println("消息失败了..."); }else{ //发送到重试队列,10s后重试 channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body); System.out.println("消息重试中..."); } }catch (Exception e1){ e.printStackTrace(); } } } }; //消费消息 channel.basicConsume(NORMAL_QUEUE,true,consumer);
判断消息失败次数的方法以下:
public long getRetryCount(AMQP.BasicProperties properties){ long retryCount = 0L; Map<String,Object> header = properties.getHeaders(); if(header != null && header.containsKey("x-death")){ List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death"); if(deaths.size()>0){ Map<String,Object> death = deaths.get(0); retryCount = (Long)death.get("count"); } } return retryCount; }
重试的完整代码以下:
1)、生产者
@Component public class RetryPublisher { private static String NORMAL_EXCHANGE = "exchange@normal"; private static String RETRY_EXCHANGE = "exchange@retry"; private static String FILED_EXCHANGE = "exchange@filed"; public void send(){ Connection connection = ConnectionUtil.getInstance(); Channel channel = null; try{ channel = connection.createChannel(); //声明正常的转发器 channel.exchangeDeclare(NORMAL_EXCHANGE,"topic"); //声明重试的转发器 channel.exchangeDeclare(RETRY_EXCHANGE,"topic"); //声明失败的转发器 channel.exchangeDeclare(FILED_EXCHANGE,"topic"); //发送5条消息到正常的转发器,路由密匙为normal for(int i=0;i<5;i++){ String message = "retry..."+i; channel.basicPublish(NORMAL_EXCHANGE,"normal", MessageProperties.PERSISTENT_BASIC,message.getBytes()); } }catch (Exception e){ } } }
2)、消费者方法和判断消息失败次数的方法
@Component public class RetryReceiver { //转发器 private static String NORMAL_EXCHANGE = "exchange@normal"; private static String RETRY_EXCHANGE = "exchange@retry"; private static String FILED_EXCHANGE = "exchange@filed"; //队列 private static String NORMAL_QUEUE = "queue@normal"; private static String RETRY_QUEUE = "queue@retry"; private static String FILED_QUEUE = "queue@filed"; public void receiver(){ Connection connection = ConnectionUtil.getInstance(); final Channel channel; try{ channel = connection.createChannel(); //声明正常队列 channel.queueDeclare(NORMAL_QUEUE,true,false,false,null); //声明重试队列,重试队列比较特殊,须要设置两个参数 Map<String,Object> arg = new HashMap<String,Object>(); //参数1:将消息发送到哪个转发器 arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE); //参数2:多长时间后发送 arg.put("x-message-ttl",10000); channel.queueDeclare(RETRY_QUEUE,true,false,false,arg); //声明失败队列 channel.queueDeclare(FILED_QUEUE,true,false,false,null); //将队列绑定转发器和路由密匙 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal"); channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal"); channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //此到处理消息 try{ String message = new String(body,"utf-8"); System.out.println("消费者接受到的消息:"+message); //模拟处理消息是产生异常 int i = 1/0; }catch(Exception e){ try{ //延迟5s Thread.sleep(5000); //判断失败次数 long retryCount = getRetryCount(properties); if(retryCount>=3){ //若是失败超过三次,则发送到失败队列 channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body); System.out.println("消息失败了..."); }else{ //发送到重试队列,10s后重试 channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body); System.out.println("消息重试中..."); } }catch (Exception e1){ e.printStackTrace(); } } } }; //消费消息 channel.basicConsume(NORMAL_QUEUE,true,consumer); }catch (Exception e){ e.printStackTrace(); } } /** * 获取消息失败次数 * @param properties * @return */ public long getRetryCount(AMQP.BasicProperties properties){ long retryCount = 0L; Map<String,Object> header = properties.getHeaders(); if(header != null && header.containsKey("x-death")){ List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death"); if(deaths.size()>0){ Map<String,Object> death = deaths.get(0); retryCount = (Long)death.get("count"); } } return retryCount; } }
3)、测试Controlelr
@Controller public class RetryController { @Autowired private RetryPublisher publisher; @Autowired private RetryReceiver receiver; @RequestMapping("/retrySend") @ResponseBody public void send(){ publisher.send(); } @RequestMapping("/retryReceive") @ResponseBody public void receive(){ receiver.receiver(); } }