近几日学习了一下rabbitmq消息中间件,因为好久好久以前已经学过,并且自己入门难度也不高,因此学习起来较为简单,现将全部内容及与spring、springboot整合方法记录:html
首先消息中间件定义:java
1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不一样进程Process/线程Thread之间通讯。
为何会产生消息队列?有几个缘由:
不一样进程(process)之间传递消息时,两个进程之间耦合程度太高,改动一个进程,引起必须修改另外一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),全部两进程之间传递的消息,都必须经过消息队列来传递,单独修改某一个进程,不会影响另外一个;
不一样进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,而且,某一个进程接受的消息太多,一会儿没法处理完,而且也有前后顺序,必须对收到的消息进行排队,所以诞生了事实上的消息队列;
git
我从别人博客中看到了几篇文章,写的很是棒 如今将其记录github
https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665513507&idx=1&sn=d6db79c1ae03ba9260fb0fb77727bb54&chksm=80d67a60b7a1f376e7ad1e2c3276e8b565f045b1c7e21ef90926f69d99f969557737eb5d8128&mpshare=1&scene=1&srcid=1019awkBx8kaLyFohcuW4Ee7web
这个是经过故事的方式写出消息中间件究竟是什么正则表达式
https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvYspring
这个是消息中间件的使用场景docker
https://github.com/jasonGeng88/blog/blob/master/201705/MQ.mdjson
这个故事是消息中间件在实际开发中的应用场合缓存
还有一个协议须要注意:
AMQP
还有rabbitmq的五种队列
如今我将其解释一下:
第一种.直接给 就是最简单的一对一(经过队列发)
第二种:能够当作是第一种的扩展,就是一对多,你们共享同一个消息
第三种:其实就是将消息发送给交换机,而后交换机在把消息发送给绑定在此交换机的队列上
第四种 路由模式,你能够当作是根据具体关键字来发 或者说是服务每一个队列承担着一个或多个服务,消息会根据服务来发
第五种 topic模式,跟上面的差很少 不过他是至关于多匹配,至关于正则表达式
第六种 好像是远程调用
而后是和安装和使用,安装的话我直接用的docker
其他安装手段的话你必须得先安装elrang语言,由于rabbitmq就是用这个语言编写的
接下来是各类模式的使用
稍后我会传到github去 其实难点很少 主要是几个方法的使用及各类信道绑定啊之类的 大体流程是
获取链接------建立通道-------建立通道----发布信息
获取链接------建立通道------根据知道的通道建立消费者------获取到达消息
获取链接------建立通道------建立交换机----------发布信息
获取链接------建立通道------申明队列-------绑定交换机------定义消费者(根据队列)------获取到达消息
其他几种方法无非就是多加了点关键字 没有什么差异
重点一:与spring整合
我见到的基本有两种方式
第一种就是写配置bean类
@Configuration public class HelloWorldConfiguration { protected final String helloWorldQueueName = "hello.world.queue"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.13.132"); connectionFactory.setUsername("wujifu"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhose_1"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //The routing key is set to the name of the queue by the broker for the default exchange. template.setRoutingKey(this.helloWorldQueueName); //Where we will synchronously receive messages from template.setDefaultReceiveQueue(this.helloWorldQueueName); return template; } @Bean // Every queue is bound to the default direct exchange public Queue helloWorldQueue() { return new Queue(this.helloWorldQueueName); } /* @Bean public Binding binding() { return declare(new Binding(helloWorldQueue(), defaultDirectExchange())); }*/ /* @Bean public TopicExchange helloExchange() { return declare(new TopicExchange("hello.world.exchange")); }*/ /* public Queue declareUniqueQueue(String namePrefix) { Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID()); rabbitAdminTemplate().declareQueue(queue); return queue; } // if the default exchange isn't configured to your liking.... @Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) { return declare(new Binding(queue, exchange, queue.getName())); } @Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange)); } @Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) { return declare(new Binding(uniqueQueue, exchange)); } @Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey)); }*/ }
还有一种是配置文件的方法
<!-- 公共部分 --> <!-- 建立链接类 链接安装好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <!-- username,访问RabbitMQ服务器的帐户,默认是guest --> <property name="username" value="${rmq.manager.user}" /> <!-- username,访问RabbitMQ服务器的密码,默认是guest --> <property name="password" value="${rmq.manager.password}" /> <!-- host,RabbitMQ服务器地址,默认值"localhost" --> <property name="host" value="${rmq.ip}" /> <!-- port,RabbitMQ服务端口,默认值为5672 --> <property name="port" value="${rmq.port}" /> <!-- channel-cache-size,channel的缓存数量,默认值为25 --> <property name="channel-cache-size" value="50" /> <!-- cache-mode,缓存链接模式,默认值为CHANNEL(单个connection链接,链接以后关闭,自动销毁) --> <property name="cache-mode" value="CHANNEL" /> </bean> <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例 <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}" />--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义消息队列,durable:是否持久化,若是想在RabbitMQ退出或崩溃的时候,不会失去全部的queue和消息,须要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的能够看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅建立者可使用的私有队列,断开后自动删除;auto_delete: 当全部消费客户端链接断开后,是否自动删除队列 --> <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /> <!--绑定队列,rabbitmq的exchangeType经常使用的三种模式:direct,fanout,topic三种,咱们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,若是是Direct类型,就会将消息中的RoutingKey与该Exchange关联的全部Binding中的BindingKey进行比较,若是相等,则发送到该Binding对应的Queue中。有一个须要注意的地方:若是找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,因此此处要当心,auto-delete:自动删除,若是为Yes,则该交换机全部队列queue删除后,自动删除交换机,默认为false --> <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 生产者部分 --> <!-- 发送消息的producer类,也就是生产者 --> <bean id="msgProducer" class="com.asdf.sdf.ClassA"> <!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 --> <property name="queueName" value="{alert.queue.1}"/> </bean> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,因为fastjson的速度快于jackson,这里替换为fastjson的一个实现 --> <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean> <!-- 或者配置jackson --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> --> <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消费者部分 --> <!-- 自定义接口类 --> <bean id="testHandler" class="com.rabbit.TestHandler"></bean> <!-- 用于消息的监听的代理类MessageListenerAdapter --> <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" > <!-- 类名 --> <constructor-arg ref="testHandler" /> <!-- 方法名 --> <property name="defaultListenerMethod" value="handlerTest"></property> <property name="messageConverter" ref="jsonMessageConverter"></property> </bean> <!-- 配置监听acknowledeg="manual"设置手动应答,它可以保证即便在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP链接断了),也不会丢失消息。由于RabbitMQ知道没发送ack确认消息致使这个消息没有被彻底处理,将会对这条消息作re-queue处理。若是此时有另外一个consumer链接,消息会被从新发送至另外一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"> <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> </rabbit:listener-container>
其他几种exchange
fanOut:
<!-- Fanout 扇出,顾名思义,就是像风扇吹面粉同样,吹获得处都是。若是使用fanout类型的exchange,那么routing key就不重要了。由于凡是绑定到这个exchange的queue,都会受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_delay_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端一样如此 --> <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"> <rabbit:bindings> <rabbit:binding queue="Q1" pattern="error.*.log" /> <rabbit:binding queue="Q2" pattern="error.level1.log" /> <rabbit:binding queue="Q3" pattern="error.level2.log" /> </rabbit:bindings> </rabbit:topic-exchange>
还有相关的消费者和提供者
@Resource private RabbitTemplate rabbitTemplate; private String queueName; public void sendMessage(CommonMessage msg){ try { logger.error("发送信息开始"); System.out.println(rabbitTemplate.getConnectionFactory().getHost()); //发送信息 queueName交换机,就是上面的routingKey msg.getSource() 为 test_key rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg); //若是是普通字符串消息须要先序列化,再发送消息 //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg)); logger.error("发送信息结束"); } catch (Exception e) { e.printStackTrace(); } } public void setQueueName(String queueName) { this.queueName = queueName; }
public class TestHandler { @Override public void handlerTest(CommonMessage commonMessage) { System.out.println("DetailQueueConsumer: " + new String(message.getBody())); } }
如今说说我我的的理解
其实思路与其余几个工具或者说方法 的用法都是同样的 在配置文件(配置类中配置好类工厂 模板 而后再函数中直接拿bean或者模板直接用就行了)
接下来是与springboot的整合
与springboot的整合我用到了父子模块的用法 如今记录一下,否则会忘!就是建立一个空的springboot项目,而后右击添加新项目 新项目的存储位置必定要处于父项目的目录下
而后具体整合也和spring差很少 甚至能够说是比spring还要简单一点
1.在配置文件中写上属性
2.写一个配置类
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
简单接口进行消息的推送
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok"; } }
而后是消费者方的写法 也是同样的 此处必须注意两边都同时得有配置文件 由于比较分红了两个模块了
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
而后建立消费类
@Component @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString()); } }
这里面消费者是用、
@EnableRabbit和@Configuration一块儿使用,能够加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。
@RabbitListener 和 @RabbitHandler结合使用,不一样类型的消息使用不一样的方法来处理。
其他模式的使用也差很少 就是稍微多点配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class TopicRabbitConfig { //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,并且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,并且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
workqueue方法就是多建立几个队列 多绑定几回就好了
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class FanoutRabbitConfig { /** * 建立三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 由于是扇型交换机, 路由键无需配置,配置也不起做用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
到此 具体的入门级用法都已经讲完了 具体的模型我会传到github上,接下来还有一些重点关于消息确认的知识(能够用来确保消息不丢失)
RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程当中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
(1)ConfirmCallback
经过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
使用该功能须要开启确认,spring-boot中配置以下:
spring.rabbitmq.publisher-confirms = true
(2)ReturnCallback
经过实现ReturnCallback接口,若是消息从交换器发送到对应队列失败时触发(好比根据发送消息时指定的routingKey找不到队列时会触发)
使用该功能须要开启确认,spring-boot中配置以下:
spring.rabbitmq.publisher-returns = true
根据前面的知识(深刻了解RabbitMQ工做原理及简单使用、Rabbit的几种工做模式介绍与实践)咱们知道,若是要保证消息的可靠性,须要对消息进行持久化处理,然而消息持久化除了须要代码的设置以外,还有一个重要步骤是相当重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示:
正常状况下,若是消息通过交换器进入队列就能够完成消息的持久化,但若是消息在没有到达broker以前出现意外,那就形成消息丢失,有没有办法能够解决这个问题?
RabbitMQ有两种方式来解决这个问题:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;
从上面的能够看出事务都是以tx开头的,tx应该是transaction extend(事务扩展模块)的缩写,若是有准确的解释欢迎在博客下留言。
咱们来看具体的代码实现:
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("时间 => %s", new Date().getTime()); try { channel.txSelect(); // 声明事务 // 发送消息 channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); channel.txCommit(); // 提交事务 } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); }
反正就是提交失败就回滚 可是效率很低
从上面能够看出,非事务模式的性能是事务模式的性能高149倍,个人电脑测试是这样的结果,不一样的电脑配置略有差别,但结论是同样的,事务模式的性能要差不少,那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢?那就是接下来要讲的Confirm发送方确认模式。
Confirm发送方确认模式使用和事务相似,也是经过设置Channel进行发送方确认的。
Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息发送成功" ); }
看代码能够知道,咱们只须要在推送消息以前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认便可。
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到全部信息都发布,只要有一个未确认就会IOException System.out.println("所有执行完成");
以上代码能够看出来channel.waitForConfirmsOrDie(),使用同步方式等全部的消息发送以后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //异步监听确认和未确认的消息 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple)); } });
异步模式的优势,就是执行效率高,不须要等待消息执行完,只须要监听消息便可,以上异步返回的信息以下:
能够看出,代码是异步执行的,消息确认有多是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,若是true表示批量执行了deliveryTag这个值之前的全部消息,若是为false的话表示单条确认。
最好补充一点:因为我本身整理的可能不够全面 因此我放几个连接
https://blog.csdn.net/qq_35387940/article/details/100514134 https://www.cnblogs.com/nizuimeiabc1/p/9608763.html 整合篇
https://blog.csdn.net/u013256816/article/details/55515234 消息确认机制