人生终将是场单人旅途,孤独以前是迷茫,孤独事后是成长。
这篇是消息队列RabbitMQ的第二弹。java
上一篇的结尾我也预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。git
本篇会和SpringBoot作整合,采用自动配置的方式进行开发,咱们只须要声明RabbitMQ地址就能够了,关于各类建立链接关闭链接的事都由Spring帮咱们了~程序员
交给Spring帮咱们管理链接可让咱们专一于业务逻辑,就像声明式事务同样易用,方便又高效。github
祝有好收获,先赞后看,快乐无限。spring
Tip:上一篇的代码都放在prototype
包下,本篇的代码都放在auto
包下面。数组
第一节咱们先来搞一下环境的配置,上一篇中咱们已经引入了自动配置的包,咱们既然使用了自动配置的方式,那RabbitMQ
的链接信息咱们直接放在配置文件中就好了,就像咱们须要用到JDBC链接的时候去配置一下DataSource
同样。数据结构
如图所示,咱们只须要指明一下链接的IP+端口号和用户名密码就好了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。架构
主要咱们须要看一下手动确认消息的配置,须要配置成manual
才是手动确认,往后还会有其余的配置项,眼下咱们配置这一个就能够了。app
接下来咱们要配置一个Queue
,上一篇中咱们往一个名叫erduo
的队列中发送消息,当时是咱们手动定义的此队列,这里咱们也须要手动配置,声明一个Bean
就能够了。
@Configuration public class RabbitmqConfig { @Bean public Queue erduo() { // 其三个参数:durable exclusive autoDelete // 通常只设置一下持久化便可 return new Queue("erduo",true); } }
就这么简单声明一下就能够了,固然了RabbitMQ
毕竟是一个独立的组件,若是你在RabbitMQ
中经过其余方式已经建立过一个名叫erduo
的队列了,你这里也能够不声明,这里起到的一个效果就是若是你没有这个队列,会按照你声明的方式帮你建立这个队列。
配置完环境以后,咱们就能够以SpringBoot的方式来编写生产者和消费者了。
和上一篇的节奏同样,咱们先来编写生产者,不过此次我要引入一个新的工具:RabbitTemplate
。
听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒服,什么东西都给你封装一遍,让你用起来更方便更顺手。
RabbitTemplate
实现了标准AmqpTemplate接口,功能大体能够分为发送消息和接受消息。
咱们这里是在生产者中来用,主要就是使用它的发送消息功能:send
和convertAndSend
方法。
// 发送消息到默认的Exchange,使用默认的routing key void send(Message message) throws AmqpException; // 使用指定的routing key发送消息到默认的exchange void send(String routingKey, Message message) throws AmqpException; // 使用指定的routing key发送消息到指定的exchange void send(String exchange, String routingKey, Message message) throws AmqpException;
send
方法是发送byte数组的数据的模式,这里表明消息内容的对象是Message
对象,它的构造方法就是传入byte数组数据,因此咱们须要把咱们的数据转成byte数组而后构形成一个Message
对象再进行发送。
// Object类型,能够传入POJO void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend
方法是能够传入POJO对象做为参数,底层是有一个MessageConverter
帮咱们自动将数据转换成byte类型或String或序列化类型。
因此这里支持的传入对象也只有三种:byte类型,String类型和实现了Serializable
接口的POJO。
介绍完了,咱们能够看一下代码:
@Slf4j @Component("rabbitProduce") public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = "Hello 我是做者和耳朵,欢迎关注我。" + LocalDateTime.now().toString(); System.out.println("Message content : " + message); // 指定消息类型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("消息发送完毕。"); } public void convertAndSend() { User user = new User(); System.out.println("Message content : " + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("消息发送完毕。"); } }
这里我特地写明了两个例子,一个用来测试send,另外一个用来测试convertAndSend。
send
方法里咱们看下来和以前的代码是几乎同样的,定义一个消息,而后直接send,可是这个构造消息的构造方法可能比咱们想的要多一个参数,咱们原来讲的只要把数据转成二进制数组放进去便可,如今看来还要多放一个参数了。
MessageProperties
,是的咱们须要多放一个MessageProperties
对象,从他的名字咱们也能够看出它的功能就是附带一些参数,可是某些参数是少不了的,不带不行。
好比个人代码这里就是设置了一下消息的类型,消息的类型有不少种能够是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的就是文本类型,指定类型是必须的,也能够为咱们拿到消息以后要将消息转换成什么样的对象提供一个参考。
convertAndSend
方法就要简单太多,这里我放了一个User对象拿来测试用,直接指定队列而后放入这个对象便可。
Tips:User必须实现Serializable
接口,否则的话调用此方法的时候会抛出IllegalArgumentException
异常。
代码完成以后咱们就能够调用了,这里我写一个测试类进行调用:
@SpringBootTest public class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); } }
效果以下图~
同时在控制台使用命令rabbitmqctl.bat list_queues
查看队列-erduo
如今的状况:
如此一来,咱们的生产者测试就算完成了,如今消息队列里两条消息了,并且消息类型确定不同,一个是咱们设置的文本类型,一个是自动设置的序列化类型。
既然队列里面已经有消息了,接下来咱们就要看咱们该如何经过新的方式拿到消息并消费与确认了。
消费者这里咱们要用到@RabbitListener
来帮咱们拿到指定队列消息,它的用法很简单也很复杂,咱们能够先来讲简单的方式,直接放到方法上,指定监听的队列就好了。
@Slf4j @Component("rabbitConsumer") public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认"); } }
这段代码就表明onMessage
方法会处理erduo
(Producer.QUEUE_NAME是常量字符串"erduo")队列中的消息。
咱们能够看到这个方法里面有两个参数,Message
和Channel
,若是用不到Channel
能够不写此参数,可是Message
消息必定是要的,它表明了消息自己。
咱们能够想一想,咱们的程序从RabbitMQ
之中拉回一条条消息以后,要以怎么样的方式展现给咱们呢?
没错,就是封装为一个个Message
对象,这里面放入了一条消息的全部信息,数据结构是什么样一会我一run你就能看到了。
同时这里咱们使用Channel
作一个消息确认的操做,这里的DeliveryTag表明的是这个消息在队列中的序号,这个信息存放在MessageProperties
中。
编写完生产者和消费者,同时已经运行过生产者往消息队列里面放了两条信息,接下来咱们能够直接启动消息,查看消费状况:
在我红色框线标记的地方能够看到,由于咱们有了消费者因此项目启动后先和RabbitMQ创建了一个链接进行监听队列。
随后就开始消费咱们队列中的两条消息:
第一条信息是contentType=text/plain
类型,因此直接就在控制台上打印出了具体内容。
第二条信息是contentType=application/x-java-serialized-object
,在打印的时候只打印了一个内存地址+字节大小。
无论怎么说,数据咱们是拿到了,也就是表明咱们的消费是没有问题的,同时也都进行了消息确认操做,从数据上看,整个消息能够分为两部分:body
和MessageProperties
。
咱们能够单独使用一个注解拿到这个body的内容 - @Payload
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body); }
也能够单独使用一个注解拿到MessageProperties
的headers属性,headers属性在截图里也能够看到,只不过是个空的 - @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers); }
这两个注解都算是扩展知识,我仍是更喜欢直接拿到所有,全都要!!!
上面咱们已经完成了消息的发送与消费,整个过程咱们能够再次回想一下,一切都和我画的这张图上同样的轨迹:
只不过咱们一直没有指定Exchage
一直使用的默认路由,但愿你们好好记住这张图。
下面再来补一些知识点,有关@RabbitListener
与@RabbitHandler
。
@RabbitListener
上面咱们已经简单的进行了使用,稍微扩展一下它实际上是能够监听多个队列的,就像这样:
@RabbitListener(queues = { "queue1", "queue2" }) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认"); }
还有一些其余的特性如绑定之类的,这里再也不赘述由于太硬编码了通常用不上。
下面来讲说这节要主要讲的一个特性:@RabbitListener和@RabbitHandler的搭配使用。
前面咱们没有提到,@RabbitListener
注解实际上是能够注解在类上的,这个注解在类上标志着这个类监听某个队列或某些队列。
这两个注解的搭配使用就要让@RabbitListener
注解在类上,而后用@RabbitHandler
注解在方法上,根据方法参数的不一样自动识别并去消费,写个例子给你们看一看更直观一些。
@Slf4j @Component("rabbitConsumer") @RabbitListener(queues = Producer.QUEUE_NAME) public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); } }
你们能够看看这个例子,咱们先用@RabbitListener
监听erduo
队列中的消息,而后使用@RabbitHandler
注解了两个方法。
这两个方法正好对应着咱们第二节中测试类会发送的两种消息,因此咱们往RabbitMQ中发送两条测试消息,用来测试这段代码,看看效果:
都在控制台上如常打印了,若是@RabbitHandler
注解的方法中没有一个的类型能够和你消息的类型对的上,好比消息都是byte数组类型,这里没有对应的方法去接收,系统就会在控制台不断的报错,若是你出现这个状况就证实你类型写的不正确。
假设你的erduo
队列中会出现三种类型的消息:byte,文本和序列化,那你就必需要有对应的处理这三种消息的方法,否则消息发过来的时候就会由于没法正确转换而报错。
并且使用了@RabbitHandler
注解以后就不能再和以前同样使用Message
作接收类型。
@RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认"); }
这样写的话会报类型转换异常的,因此两者选其一。
同时上文个人@RabbitHandler
没有进行消息确认,你们能够本身试一下进行消息确认。
经过上文咱们已经知道,能被自动转换的对象只有byte[]
、String
、java序列化对象
(实现了Serializable接口的对象),可是并非全部的Java对象都会去实现Serializable接口,并且序列化的过程当中使用的是JDK自带的序列化方法,效率低下。
因此咱们更广泛的作法是:使用Jackson先将数据转换成JSON格式发送给RabbitMQ
,再接收消息的时候再用Jackson将数据反序列化出来。
这样作能够完美解决上面的痛点:消息对象既没必要再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界应该是最好的了)。
默认的消息转换方案是消息转换顶层接口-MessageConverter
的一个子类:SimpleMessageConverter
,咱们若是要换到另外一个消息转换器只须要替换掉这个转换器就好了。
上图是MessageConverter
结构树的结构树,能够看到除了SimpleMessageConverter
以外还有一个Jackson2JsonMessageConverter
,咱们只须要将它定义为Bean,就能够直接使用这个转换器了。
@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }
这样就能够了,这里的jacksonObjectMapper
能够不传入,可是默认的ObjectMapper
方案对JDK8的时间日期序列化会不太友好,具体能够参考个人上一篇文章:从LocalDateTime序列化探讨全局一致性序列化,总的来讲就是定义了本身的ObjectMapper
。
同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的队列:
@Bean public Queue erduoJson() { // 其三个参数:durable exclusive autoDelete // 通常只设置一下持久化便可 return new Queue("erduo_json",true); }
如此以后就能够进行测试了,先是生产者代码:
public void sendObject() { Client client = new Client(); System.out.println("Message content : " + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("消息发送完毕。"); }
我又从新定义了一个Client
对象,它和以前测试使用的User对象成员变量都是同样的,不同的是它没有实现Serializable接口。
同时为了保留以前的测试代码,我又新建了一个RabbitJsonConsumer
,用于测试JSON序列化的相关消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json"
;
因此这段代码是将Client
对象做为消息发送到"erduo_json"
队列中去,随后咱们在测试类中run一下进行一次发送。
紧着是消费者代码:
@Slf4j @Component("rabbitJsonConsumer") @RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE) public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json"; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("消息已确认"); } }
有了上文的经验以后,这段代码理解起来也是很简单了吧,同时给出了上一节没写的如何在@RabbitHandler
模式下进行消息签收。
咱们直接来看看效果:
在打印的Headers里面,日后翻能够看到contentType=application/json
,这个contentType
是代表了消息的类型,这里正是说明咱们新的消息转换器生效了,将全部消息都转换成了JSON类型。
这两篇讲完了RabbitMQ
的基本收发消息,包括手动配置和自动配置的两种方式,这些你们仔细研读以后应该会对RabbitMQ
收发消息没什么疑问了~
不过咱们一直以来发消息时都是使用默认的交换机,下篇将会讲述一下RabbitMQ
的几种交换机类型,以及其使用方式。
讲完了交换机以后,这些RabbitMQ
的经常使用概念基本就完善了。
最近这段时间压力挺大,优狐令我八月底以前升级到三级,因此各位读者的赞对我很重要,但愿你们可以高抬贵手,帮我一哈~
好了,以上就是本期的所有内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍大家的每一个点赞都是我创做的最大动力。
我是耳朵,一个一直想作知识输出的伪文艺程序员,咱们下期见。