场景:业务须要,若是在消费者端出现异常或者特殊状况下,须要让消息回到队列再次进行消耗
复制代码
在redisConfig中进行设置redis
@Autowired
private TestReceiver testReceiver ;
@Bean
public SimpleMessageListenerContainer qrcodeListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer(connectionFactory);
factory.setQueues(test());
factory.setConcurrentConsumers(3);
factory.setExposeListenerChannel(true);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置手动应答
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMessageListener(testReceiver);
return factory;
}
@Bean
public Queue test(){
return new Queue(Constants.QUEUE_NAME_ALI_DYNAMIC_QR_CODE,false);
}
复制代码
接收者类bash
@Component
public class TestReceiver implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//消息转换
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
Object object = converter.fromMessage(message);
//业务代码...
//手动确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//手动否认
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
复制代码