RabbitMQ的工做原理spring
它的基本结构异步
组成部分说明以下:测试
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。ui
Exchange:消息队列交换机,按必定的规则将消息路由转发到某个队列,对消息进行过虑。spa
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。code
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。orm
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。blog
Maven举例配置rabbitmq
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp‐client</artifactId> <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐logging</artifactId> </dependency>
生产者举例Demo队列
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启以后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
消费者举例Demo
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启以后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
工做模式
RabbitMQ有如下几种工做模式 :
一、Work queues
二、Publish/Subscribe
三、Routing
四、Topics
五、Header
六、RPC
Work queues
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务太重或任务较多状况使用工做队列能够提升任务处理的速度。
测试:
一、使用入门程序,启动多个消费者。
二、生产者发送多个消息。
结果:
一、一条消息只会被一个消费者接收;
二、rabbit采用轮询的方式将消息是平均发送给消费者的;
三、消费者在处理完某条消息后,才会收到下一条消息。
Publish/subscribe 发布订阅模式
发布订阅模式:
一、每一个消费者监听本身的队列。
二、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每一个队列,每一个绑定交换机的队列都将接收
到消息
Routin
路由模式:
一、每一个消费者监听本身的队列,而且设置routingkey。
二、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
这是一种很是灵活的模式,常常被用到
Topics
路由模式:
一、每一个消费者监听本身的队列,而且设置带统配符的routingkey。
二、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
Header模式
header模式与routing不一样的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配
队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种
通知类型都接收的则两种通知都有效。
生产者Demo:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知Demo :
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
发送邮件消费者 :
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消费队列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
RPC
RPC即客户端远程调用服务端的方法 ,使用MQ能够实现RPC的异步调用,基于Direct交换机实现,流程以下:
一、客户端便是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
二、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,获得方法返回的结果
三、服务端将RPC方法 的结果发送到RPC响应队列
四、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。