Springboot2.x整合RabbitMQ

一、RabbitMQ介绍

可参照RabbitMQ笔记git

二、接入配置

pom依赖github

<!--amqp依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件spring

server.port=8080 spring.application.name=springboot-rabbitmq spring.rabbitmq.host=192.168.242.131 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 spring.rabbitmq.publisher-returns=true # 开启ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual

三、一对一模式

  即一个生产者对一个消费者模式安全

配置类springboot

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }

消费者服务器

@Component //监听队列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

消息生产者测试接口app

/** * 单条消息发送给单个队列,该队列只有一个消费者 * * @return
     */ @GetMapping(value = "send") public String send() { String content = "Date:" + System.currentTimeMillis(); //发送默认交换机对应的的队列kinson
        amqpTemplate.convertAndSend("kinson", content); return content; }

四、一对多模式

  即一个生产者对多个消费者,该模式下能够是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也能够是一个生产者将消息投递到多个队列里,此时消息是被复制处理。异步

模式一:ide

配置类spring-boot

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }

消费者1

@Component //监听队列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

消费者2

@Component //监听队列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver2 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver2 :" + msg); } }

消息生产者测试接口

/** * 发送多条消息到一个队列,该队列有多个消费者 * * @return
     */ @GetMapping(value = "sendMore") public String sendMore() { List<String> result = new ArrayList<String>(); //发送10条数据
        for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis(); //发送默认交换机对应的的队列kinson,此时有两个消费者MyReceiver1和MyReceiver2,每条消息只会被消费一次
            amqpTemplate.convertAndSend("kinson", content); result.add(content); } return String.join("<br/>", result); }

模式二:

配置类

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } @Bean public Queue kinsonQueue2() { return new Queue("kinson2"); } }

kinson队列消费者

@Component //监听队列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

kinson2队列消费者

@Component //监听队列kinson2
@RabbitListener(queues = {"kinson2"}) public class MyReceiver3 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver3 :" + msg); } }

消息生产者测试接口

  /** * 发送多条消息到多个队列 * * @return
     */ @GetMapping(value = "sendMoreQueue") public String sendMoreQueue() { List<String> result = new ArrayList<String>(); //发送10条数据
        for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis(); //发送默认交换机对应的的队列kinson
            amqpTemplate.convertAndSend("kinson", content); //发送默认交换机对应的的队列kinson2
            amqpTemplate.convertAndSend("kinson2", content); result.add(content); } return String.join("<br/>", result); }

相应测试结果请自测

五、ACK消息确认

配置文件加入相应配置

# 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 spring.rabbitmq.publisher-returns=true # 开启ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置类,使用Fanout类型的Exchange,主要是设置队列,交换机及绑定

@Configuration public class RabbitMqFanoutACKConfig { @Bean public Queue ackQueue() { return new Queue("ackQueue"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(ackQueue).to(fanoutExchange); } }

消息发送服务

@Service public class AckSenderService implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
} /** * 消息发送 */ public void send() { final String content = "如今时间是" + LocalDateTime.now(ZoneId.systemDefault()); //设置返回回调 rabbitTemplate.setReturnCallback(this); //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息发送成功!"); } else { System.out.println("消息发送失败," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

消息消费者

@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                + LocalDateTime.now(ZoneId.systemDefault())); try { //告诉服务器收到这条消息已经被当前消费者消费了,能够在队列安全删除,这样后面就不会再重发了, //不然消息服务器觉得这条消息没处理掉,后续还会再发 //第二个参数是消息的标识,false只确认当前一个消息收到,true确认全部consumer得到的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }

测试访问接口

  /** * @return
     */ @GetMapping(value = "ackSend") public String ackSend() { senderService.send(); return "ok"; }

测试将Consumer确认代码注释掉,即

@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                + LocalDateTime.now(ZoneId.systemDefault())); try { //告诉服务器收到这条消息已经被当前消费者消费了,能够在队列安全删除,这样后面就不会再重发了, //不然消息服务器觉得这条消息没处理掉,后续还会再发 //第二个参数是消息的标识,false只确认当前一个消息收到,true确认全部consumer得到的消息 //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }

此时访问测试接口,能够看到当消息发送完被消费掉以后,队列的状态变为unacked。

当停掉服务时,unacked状态变为Ready

再从新启动服务时会从新发送消息

六、事务机制

事务的实现主要是对信道(Channel)的设置,主要的方法有三个: //声明启动事务模式
channel.txSelect(); //提交事务
channel.txComment(); //回滚事务
channel.txRollback();

消息发送示例

public void publish() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException { // 建立链接
        ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道
        Channel channel = conn.createChannel(); // 声明队列
        channel.queueDeclare(TX_QUEUE, true, false, false, null); try { long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 声明事务
 channel.txSelect(); String message = String.format("时间 => %s", System.currentTimeMillis()); // 发送消息
                channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); // 提交事务
 channel.txCommit(); } long endTime = System.currentTimeMillis(); System.out.println("事务模式,发送10条数据,执行花费时间:" + (endTime - startTime) + "s"); } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); } }

消息消费示例

public void consume() throws IOException, TimeoutException, InterruptedException { Connection conn = RabbitMqConnFactoryUtil.getRabbitConn(); Channel channel = conn.createChannel(); channel.queueDeclare(TX_QUEUE, true, false, false, null); // 声明事务
 channel.txSelect(); try { //单条消息获取进行消费
            GetResponse resp = channel.basicGet(TX_QUEUE, false); String message = new String(resp.getBody(), "UTF-8"); System.out.println("收到消息:" + message); //消息拒绝 // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); // 消息确认
            channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 提交事务
 channel.txCommit(); } catch (Exception e) { // 回滚事务
 channel.txRollback(); } finally { //关闭通道、链接
 channel.close(); conn.close(); } }

七、Confirm消息确认

Confirm发送方确认模式使用和事务相似,也是经过设置Channel进行发送方确认的,Confirm的三种实现方式: //方式一:普通发送方确认模式
channel.waitForConfirms(); //方式二:批量确认模式
channel.waitForConfirmsOrDie(); //方式三:异步监听发送方确认模式
channel.addConfirmListener();

消息发布示例

public void publish() throws IOException, TimeoutException, InterruptedException { // 建立链接
        ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道
        Channel channel = conn.createChannel(); // 声明队列
        channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null); long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 开启发送方确认模式
 channel.confirmSelect(); String message = String.format("时间 => %s", System.currentTimeMillis()); channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8")); } //添加确认监听器
        channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple)); } }); long endTime = System.currentTimeMillis(); System.out.println("执行花费时间:" + (endTime - startTime) + "s"); }

 

RabbitMQ简单示例源码参照Github

相关文章
相关标签/搜索