分布式微服务架构中,为了下降两个服务之间代码的耦合度,能够考虑使用mq发送消息来作数据传送。
场景:prod模块接受前台传递的id参数,利用mq给cons模块发送消息,cons成功接收到消息后,调用print()方法。html
<dependencies> <!--rabbitmq消息队列--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.2.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.6.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> </dependencies>
/** * @author yhd * @createtime 2020/10/2 11:51 * @Description 消息发送确认 * ConfirmCallback 只确认消息是否正确到达 Exchange 中 * ReturnCallback 消息没有正确到达队列时触发回调,若是正确到达队列不执行 * 1. 若是消息没有到exchange,则confirm回调,ack=false * 2. 若是消息到达exchange,则confirm回调,ack=true * 3. exchange到queue成功,则不回调return * 4. exchange到queue失败,则回调return */ @Component @Slf4j public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送成功:" + JSON.toJSONString(correlationData)); } else { log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData)); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 反序列化对象输出 System.out.println("消息主体: " + new String(message.getBody())); System.out.println("应答码: " + replyCode); System.out.println("描述:" + replyText); System.out.println("消息使用的交换器 exchange : " + exchange); System.out.println("消息使用的路由键 routing : " + routingKey); } }
/** * @author yhd * @createtime 2020/10/2 11:50 * 发送消息 */ @Service public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param exchange * @param routingKey * @param message * @return */ public boolean sendMessage(String exchange, String routingKey, Object message){ rabbitTemplate.convertAndSend(exchange, routingKey, message); return true; } }
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.6.RELEASE</version> </dependency> <dependency> <groupId>com.yhd</groupId> <artifactId>mq-util</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>
application.ymljava
spring: rabbitmq: host: 192.168.200.128 port: 5672 username: guest password: guest publisher-confirms: true publisher-returns: true listener: simple: cknowledge-mode: manual #默认状况下消息消费者是自动确认消息的,若是要手动确认消息则须要修改确认模式为manual prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发 server: port: 8001
/** * @author yhd * @createtime 2020/10/2 11:41 */ @SpringBootApplication public class ProdApplication { public static void main(String[] args) { SpringApplication.run(ProdApplication.class, args); } }
/** * @author yhd * @createtime 2020/10/2 11:42 */ @RestController @Slf4j public class ProdController { @Autowired private ProdService prodService; @GetMapping("/prod/{id}") public String prod(@PathVariable("id") Integer id){ prodService.prod(id); log.info("ProdController:"+"prod().... : id = "+id); return "success"; } }
/** * @author yhd * @createtime 2020/10/2 11:45 */ @Service @Slf4j public class ProdService { @Autowired private SendMessageService sendMessageService; public void prod(Integer id){ sendMessageService.sendMessage("exchange.confirm", "routing.confirm", id); } }
同mq-prod模块web
spring: rabbitmq: host: 192.168.200.128 port: 5672 username: guest password: guest publisher-confirms: true publisher-returns: true listener: simple: cknowledge-mode: manual #默认状况下消息消费者是自动确认消息的,若是要手动确认消息则须要修改确认模式为manual prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发 server: port: 8002
/** * @author yhd * @createtime 2020/10/2 12:03 */ @SpringBootApplication public class ConsApplication { public static void main(String[] args) { SpringApplication.run(ConsApplication.class,args); } }
/** * @author yhd * @createtime 2020/10/2 12:04 */ @Service @Slf4j public class ConsService { public void print(){ log.info("cons模块的consService类的print()执行了..."); } }
/** * @author yhd * @createtime 2020/10/2 12:06 */ @Component @SpringBootConfiguration public class ConsReceiver { @Autowired private ConsService consService; @SneakyThrows @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue.confirm", autoDelete = "false"), exchange = @Exchange(value = "exchange.confirm", autoDelete = "true"), key = { "routing.confirm"})) public void process(Message message, Channel channel) { // 采用手动应答模式, 手动确认应答更为安全稳定 //若是手动肯定了,再出异常,mq不会通知;若是没有手动确认,抛异常mq会一直通知 try { consService.print(); // false 确认一个消息,true 批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息是否再次被拒绝! System.out.println("come on!"); // getRedelivered() 判断是否已经处理过一次消息! if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重复处理,拒绝再次接收"); // 拒绝消息,requeue=false 表示再也不从新入队,若是配置了死信队列则进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息即将再次返回队列处理"); // 参数二:是否批量, 参数三:为是否从新回到队列,true从新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
访问http://localhost:8001/prod/1,能够看到控制台打印print()…方法执行了,说明消息发送成功。spring
延迟消息有两种实现方案:
1.基于死信队列
2.集成延迟插件docker
使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,经过这二者的组合来实现延迟队列。json
消息的TTL就是消息的存活时间。RabbitMQ能够对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。
如何设置TTL:
咱们建立一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在这个队列的消息在5秒后会消失。安全
一个消息在知足以下条件下,会进死信路由,记住这里是路由而不是队列,一个路由能够对应不少队列。
(1) 一个消息被Consumer拒收了,而且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其余消费者使用。
(2)上面的消息的TTL到了,消息过时了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和建立其余exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过时了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
bash
2 、基于延迟插件实现延迟消息
Rabbitmq实现了一个插件x-delay-message来实现延时队列
说明:基于插件的延迟消息可能有一个小bug(不影响业务),就是生产者发送消息时会回调returnedMessage方法(消息确认时咱们配置的回调方法,表示交换机到队列发送失败),其实基于插件的延迟消息是发送成功了的,若是发生该bug,咱们能够根据交换机或队列过滤掉该消息,别让他加入重试队列;若是不能接受后续业务咱们能够使用死信的方式发送延迟消息。服务器
/** * @author yhd * @createtime 2020/10/2 17:09 */ public class MqConst { /** * 取消订单,发送延迟队列 */ public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange; public static final String ROUTING_ORDER_CANCEL = "order.create"; //延迟取消订单队列 public static final String QUEUE_ORDER_CANCEL = "queue.order.cancel"; //取消订单 延迟时间 单位:秒 public static final int DELAY_TIME = 2*60; }
/** * @author yhd * @createtime 2020/10/2 16:22 * 延时消息接收配置 */ @SpringBootConfiguration public class DelayMqConfig { @Bean public Queue delayQueue(){ // 第一个参数是建立的queue的名字,第二个参数是是否支持持久化 return new Queue(MqConst.QUEUE_ORDER_CANCEL,true); } @Bean public CustomExchange delayExchange(){ Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelay(){ return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(MqConst.ROUTING_ORDER_CANCEL).noargs(); } }
/** * 发送延时消息 * @param exchange * @param routingKey * @param message * @param delayTime * @return */ public boolean sendDelayMessage(String exchange, String routingKey, Object message,int delayTime){ rabbitTemplate.convertAndSend(exchange, routingKey, message, message1 -> { message1.getMessageProperties().setDelay(delayTime*1000); return message1; }); return true; }
/** * 接受延迟消息 * 延迟队列,不能再这里作交换机与队列绑定 */ @RabbitListener(queues = MqConst.QUEUE_ORDER_CANCEL) public void orderCancel(Long orderId, Message message, Channel channel) throws IOException { if (null!=orderId){ //防止重复消费 consService.print(); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }