https://github.com/401Studio/WeekLearn/issues/2java
RabbitMQ 即一个消息队列,_主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的做用。_RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。git
在 RabbitMQ 中,以下图结构:github
那么,_其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。_spring
rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是作消息分发,producer甚至不知道消息发送到那个队列中去。所以,当exchange收到message时,必须准确知道该如何分发。是append到必定规则的queue,仍是append到多个queue中,仍是被丢弃?这些规则都是经过exchagne的4种type去定义的。安全
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.springboot
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.服务器
exchange是一个消息的agent,每个虚拟的host中都有定义。它的职责是把message路由到不一样的queue中。app
exchange和queue经过routing-key关联,这二者之间的关系是就是binding。以下图所示,X表示交换机,红色表示队列,交换机经过一个routing-key去binding一个queue,routing-key有什么做用呢?看Direct exchange类型交换机。less
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。异步
A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,全部queue都默认binding 到该交换机上。全部binding到该交换机上的queue,routing-key都和queue的name同样。
通配符交换机,exchange会把消息发送到一个或者多个知足通配符规则的routing-key
的queue。其中_表号匹配一个word,#匹配多个word和路径,路径之间经过.隔开。如知足a._.c的routing-key有a.hello.c;知足#.hello的routing-key有a.b.c.helo。
扇形交换机,该交换机会把消息发送到全部binding到该交换机上的queue。这种是publisher/subcribe模式。用来作广播最好。
全部该exchagne上指定的routing-key都会被ignore掉。
The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored.
设置header attribute参数类型的交换机。
安装就不说了,建议按照官方文档上作。先贴代码,稍后解释,代码以下:
配置 交换机,队列,交换机与队列的绑定,消息监视容器:
@Configuration @Data public class RabbitMQConfig { final static String queueName = "spring-boot"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange("spring-boot-exchange"); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(queueName); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Receiver receiver() { return new Receiver(); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
配置接收信息者(即消费者):
public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }
配置发送信息者(即生产者):
@RestController public class Test { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET) public String test(@PathVariable(value = "abc") String abc){ rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!"); return "abc"; } }
以上即可实现一个简单的 RabbitMQ Demo,具体代码在:点这里
那么,这里,分为三个部分分析:发消息,交换机队列,收消息。
rabbitTemplate.convertAndSend("spring-boot", xxx);
便可发送信息。TopicExchange
,配置队列 Queue
,而且配置他们之间的绑定 Binding
container.setMessageListener(listenerAdapter);
其中,MessageListenerAdapter 能够看作是 咱们接收者的一个包装类,new MessageListenerAdapter(receiver, "receiveMessage");
指明了若是有消息来,那么调用接收者哪一个方法进行处理。spring xml方式实现RabbitMQ简单,可读性较好,配置简单,配置和实现以下所示。
上文已经讲述了rabbitmq的配置,xml方式经过properites文件存放用户配置信息:
mq.host=127.0.0.1 mq.username=guest mq.password=guest mq.port=5672
配置application-mq.xml配置文件,声明链接、交换机、queue以及consumer监听。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 链接服务配置</description> <!-- 链接配置 --> <context:property-placeholder location="classpath:mq.properties" /> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template声明--> <rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" /> <!--申明queue--> <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" /> <!--申明exchange交换机并绑定queue--> <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange"> <rabbit:bindings> <rabbit:binding queue="test_queue_key" key="test_queue_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!--consumer配置监听--> <bean id="reveiver" class="com.demo.mq.receive.Reveiver" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/> </rabbit:listener-container> </beans>
上述代码中,引入properties文件就很少说了。
<rabbit:connection-factory>
标签声明建立connection的factory工厂。
<rabbit-template>
声明spring template,和上文spring中使用template同样。template可声明exchange。
<rabbit:queue>
声明一个queue并设置queue的配置项,直接看标签属性就能够明白queue的配置项。
<rabbit:direct-exchange>
声明交换机并绑定queue。
<rabbit:listener-container>
申明监听container并配置consumer和监听routing-key。
剩下就简单了,application-context.xml中把rabbitmq配置import进去。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"> <context:component-scan base-package="com.demo.**" /> <import resource="application-mq.xml" /> </beans>
Producer实现,发送消息仍是使用template的convertAndSend() deliver消息。
@Service public class Producer { @Autowired private AmqpTemplate amqpTemplate; private final static Logger logger = LoggerFactory.getLogger(Producer.class); public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(queueKey, object); } catch (Exception e) { e.printStackTrace(); logger.error("exeception={}",e); } } }
配置consumer
package com.demo.mq.receive; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; @Service public class Reveiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("reveice msg=" + message.toString()); latch.countDown(); } }
测试deliver消息
Controller @RequestMapping("/demo/") public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); @Resource private Producer producer; @RequestMapping("/test/{msg}") public String send(@PathVariable("msg") String msg){ logger.info("#TestController.send#abc={msg}", msg); System.out.println("msg="+msg); producer.sendDataToQueue("test_queue_key",msg); return "index"; } }
在生产环境中,因为 Spring 对 RabbitMQ 提供了一些方便的注解,因此首先可使用这些注解。例如:
具体这些注解的使用,能够参考这里的代码:点这里
首先,生产环境下的 RabbitMQ 可能不会在生产者或者消费者本机上,因此须要从新定义 ConnectionFactory,即:
@Bean ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); return connectionFactory; }
这里,能够从新设置须要链接的 RabbitMQ 的 ip,端口,虚拟主机,用户名,密码。
而后,能够先从生产端考虑,生产端须要链接 RabbitMQ,那么能够经过 RabbitTemplate 进行链接。 Ps:(RabbitTemplate 用于生产端发送消息到交换机中),以下代码:
@Bean(name="myTemplate") RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(integrationEventMessageConverter()); template.setExchange(exchangeName); return template; }
在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端链接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给交换机的消息是以什么格式的,在 integrationEventMessageConverter()
代码中:
public MessageConverter integrationEventMessageConverter() { Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; }
如上 Jackson2JsonMessageConverter
指明了 JSON。上述代码的最后 template.setExchange(exchangeName);
指明了 要把生产者要把消息发送到哪一个交换机上。
有了上述,那么,咱们便可使用 rabbitTemplate.convertAndSend("spring-boot", xxx);
发送消息,xxx 表示任意类型,由于上述的设置会帮咱们把这些类型转化成 JSON 传输。
接着,生产端发送咱们说过了,那么如今能够看看消费端:
对于消费端,咱们能够只建立 SimpleRabbitListenerContainerFactory
,它可以帮咱们生成 RabbitListenerContainer,而后咱们再使用 @rabbitlistener 指定接收者收到信息时处理的方法。
@Bean(name="myListenContainer") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setMessageConverter(integrationEventMessageConverter()); factory.setConnectionFactory(connectionFactory()); return factory; }
这其中 factory.setMessageConverter(integrationEventMessageConverter());
指定了咱们接受消息的时候,以 JSON 传输的消息能够转换成对应的类型传入到方法中。例如:
@Slf4j @Component @RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot") public class Receiver { @RabbitHandler public void receiveTeacher(Teacher teacher) { log.info("##### = {}",teacher); } }
可能出现的问题:
在生产环境中,咱们须要考虑万一辈子产者挂了,消费者挂了,或者 rabbitmq 挂了怎么样。通常来讲,若是生产者挂了或者消费者挂了,实际上是没有影响,由于消息就在队列里面。那么万一 rabbitmq 挂了,以前在队列里面的消息怎么办,其实能够作消息持久化,RabbitMQ 会把信息保存在磁盘上。
作法是能够先从 Connection 对象中拿到一个 Channel 信道对象,而后再能够经过该对象设置 消息持久化。
这里 Spring 有自动重连机制。
每一个Consumer可能须要一段时间才能处理完收到的数据。若是在这个过程当中,Consumer出错了,异常退出了,而数据尚未处理完成,那么 很是不幸,这段数据就丢失了。由于咱们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不论是否处理完 成,RabbitMQ Server会当即把这个Message标记为完成,而后从queue中删除了。
若是一个Consumer异常退出了,它处理的数据可以被另外的Consumer处理,这样数据在这种状况下就不会丢失了(注意是这种状况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够去安全的删除它了。
若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的状况下数据也不会丢失。