关于RabbitMQ的使用

一.分布式架构中作数据同步

分布式微服务架构中,为了下降两个服务之间代码的耦合度,能够考虑使用mq发送消息来作数据传送。
场景:prod模块接受前台传递的id参数,利用mq给cons模块发送消息,cons成功接收到消息后,调用print()方法。html

1.搭建项目:Demo

在这里插入图片描述

2.建立mq-util模块

1.导入依赖

<dependencies>
        <!--rabbitmq消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>
    </dependencies>

2.消息发送确认

/** * @author yhd * @createtime 2020/10/2 11:51 * @Description 消息发送确认 * ConfirmCallback 只确认消息是否正确到达 Exchange 中 * ReturnCallback 消息没有正确到达队列时触发回调,若是正确到达队列不执行 * 1. 若是消息没有到exchange,则confirm回调,ack=false * 2. 若是消息到达exchange,则confirm回调,ack=true * 3. exchange到queue成功,则不回调return * 4. exchange到queue失败,则回调return */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ 
 
  

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void init() { 
 
  
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) { 
 
  
        if (ack) { 
 
  
            log.info("消息发送成功:" + JSON.toJSONString(correlationData));
        } else { 
 
  
            log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 
 
  
        // 反序列化对象输出
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);


    }
}

3.发送消息

/** * @author yhd * @createtime 2020/10/2 11:50 * 发送消息 */
@Service
public class SendMessageService { 
 
  

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** * 发送消息 * @param exchange * @param routingKey * @param message * @return */
    public boolean sendMessage(String exchange, String routingKey, Object message){ 
 
  
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
}

3.建立mq-prod模块

1.添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.yhd</groupId>
            <artifactId>mq-util</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

2.编写配置文件

application.ymljava

spring:
  rabbitmq:
    host: 192.168.200.128
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        cknowledge-mode: manual #默认状况下消息消费者是自动确认消息的,若是要手动确认消息则须要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
server:
  port: 8001

3.主启动类

/** * @author yhd * @createtime 2020/10/2 11:41 */
@SpringBootApplication
public class ProdApplication { 
 
  

    public static void main(String[] args) { 
 
  
        SpringApplication.run(ProdApplication.class, args);
    }
}

4.控制层

/** * @author yhd * @createtime 2020/10/2 11:42 */
@RestController
@Slf4j
public class ProdController { 
 
  

    @Autowired
    private ProdService prodService;

    @GetMapping("/prod/{id}")
    public String prod(@PathVariable("id") Integer id){ 
 
  
        prodService.prod(id);
        log.info("ProdController:"+"prod().... : id = "+id);
        return "success";
    }
}

5.service层

/** * @author yhd * @createtime 2020/10/2 11:45 */
@Service
@Slf4j
public class ProdService { 
 
  

    @Autowired
    private SendMessageService sendMessageService;
    public void prod(Integer id){ 
 
  
        sendMessageService.sendMessage("exchange.confirm", "routing.confirm", id);
    }
}

4.建立mq-cons模块

1.添加依赖

同mq-prod模块web

2.配置文件

spring:
  rabbitmq:
    host: 192.168.200.128
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        cknowledge-mode: manual #默认状况下消息消费者是自动确认消息的,若是要手动确认消息则须要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
server:
  port: 8002

3.主启动类

/** * @author yhd * @createtime 2020/10/2 12:03 */
@SpringBootApplication
public class ConsApplication { 
 
  

    public static void main(String[] args) { 
 
  
        SpringApplication.run(ConsApplication.class,args);
    }
}

4.服务层方法

/** * @author yhd * @createtime 2020/10/2 12:04 */
@Service
@Slf4j
public class ConsService { 
 
  

    public void print(){ 
 
  
        log.info("cons模块的consService类的print()执行了...");
    }
}

5.接受消息

/** * @author yhd * @createtime 2020/10/2 12:06 */
@Component
@SpringBootConfiguration
public class ConsReceiver { 
 
  

    @Autowired
    private ConsService consService;

    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue.confirm", autoDelete = "false"),
            exchange = @Exchange(value = "exchange.confirm", autoDelete = "true"),
            key = { 
 
  "routing.confirm"}))
    public void process(Message message, Channel channel) { 
 
  

        // 采用手动应答模式, 手动确认应答更为安全稳定
        //若是手动肯定了,再出异常,mq不会通知;若是没有手动确认,抛异常mq会一直通知

        try { 
 
  
            consService.print();
            // false 确认一个消息,true 批量确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) { 
 
  
            // 消息是否再次被拒绝!
            System.out.println("come on!");
            // getRedelivered() 判断是否已经处理过一次消息!
            if (message.getMessageProperties().getRedelivered()) { 
 
  
                System.out.println("消息已重复处理,拒绝再次接收");
                // 拒绝消息,requeue=false 表示再也不从新入队,若是配置了死信队列则进入死信队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else { 
 
  
                System.out.println("消息即将再次返回队列处理");
                // 参数二:是否批量, 参数三:为是否从新回到队列,true从新入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

5.测试

访问http://localhost:8001/prod/1,能够看到控制台打印print()…方法执行了,说明消息发送成功。spring


二,分布式架构中发送延时消息

延迟消息有两种实现方案:
1.基于死信队列
2.集成延迟插件docker

1.基于死信队列的延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,经过这二者的组合来实现延迟队列。json

1.消息的TTL

消息的TTL就是消息的存活时间。RabbitMQ能够对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。
如何设置TTL:
咱们建立一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在这个队列的消息在5秒后会消失。安全

2.死信交换机

一个消息在知足以下条件下,会进死信路由,记住这里是路由而不是队列,一个路由能够对应不少队列。
(1) 一个消息被Consumer拒收了,而且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其余消费者使用。
(2)上面的消息的TTL到了,消息过时了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和建立其余exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过时了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
在这里插入图片描述bash

2.基于延迟插件实现延迟消息

2 、基于延迟插件实现延迟消息
Rabbitmq实现了一个插件x-delay-message来实现延时队列
说明:基于插件的延迟消息可能有一个小bug(不影响业务),就是生产者发送消息时会回调returnedMessage方法(消息确认时咱们配置的回调方法,表示交换机到队列发送失败),其实基于插件的延迟消息是发送成功了的,若是发生该bug,咱们能够根据交换机或队列过滤掉该消息,别让他加入重试队列;若是不能接受后续业务咱们能够使用死信的方式发送延迟消息。服务器

1.插件安装

  1. 首先咱们将刚下载下来的rabbitmq_delayed_message_exchange-3.8.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
  2. 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
  3. 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
  4. 执行 ls -l|grep delay 命令查看插件是否copy成功
  5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
  6. exit命令退出RabbitMQ容器内部,而后执行 docker restart rabbitmq 命令重启RabbitMQ容器

2.代码实现

1.定义常量类

/** * @author yhd * @createtime 2020/10/2 17:09 */
public class MqConst { 
 
  

    /** * 取消订单,发送延迟队列 */
    public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
    public static final String ROUTING_ORDER_CANCEL = "order.create";
    //延迟取消订单队列
    public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";
    //取消订单 延迟时间 单位:秒
    public static final int DELAY_TIME  = 2*60;
}

2.在消息发送方配置延时消息插件

/** * @author yhd * @createtime 2020/10/2 16:22 * 延时消息接收配置 */
@SpringBootConfiguration
public class DelayMqConfig { 
 
  

    @Bean
    public Queue delayQueue(){ 
 
  
        // 第一个参数是建立的queue的名字,第二个参数是是否支持持久化
        return new Queue(MqConst.QUEUE_ORDER_CANCEL,true);
    }

    @Bean
    public CustomExchange delayExchange(){ 
 
  
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelay(){ 
 
  
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(MqConst.ROUTING_ORDER_CANCEL).noargs();
    }
}

3.编写发送延时消息的方法

/** * 发送延时消息 * @param exchange * @param routingKey * @param message * @param delayTime * @return */
    public boolean sendDelayMessage(String exchange, String routingKey, Object message,int delayTime){ 
 
  
        rabbitTemplate.convertAndSend(exchange, routingKey, message, message1 -> { 
 
  
            message1.getMessageProperties().setDelay(delayTime*1000);
            return message1;
        });
        return true;
    }

4.在消费者端写接受延时消息的方法

/** * 接受延迟消息 * 延迟队列,不能再这里作交换机与队列绑定 */
    @RabbitListener(queues = MqConst.QUEUE_ORDER_CANCEL)
    public void orderCancel(Long orderId, Message message, Channel channel) throws IOException { 
 
  
        if (null!=orderId){ 
 
  
            //防止重复消费
            consService.print();
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }