PS:近期在南宁出差,工做比较忙,因此更新会比较慢。web
说到消息通讯,可能咱们首先会想到的是邮箱,QQ,微信,短信等等这些通讯方式,这些通讯方式都有发送者,接收者,还有一个中间存储离线消息的容器。可是这些通讯方式和咱们要讲的 RabbitMQ 的通讯模型是不同的,好比和邮件的通讯方式相比,邮件服务器基于 POP3/SMTP 协议,通讯双方须要明确指定,而且发送的邮件内容有固定的结构。而 RabbitMQ 服务器基于 AMQP 协议,这个协议是不须要明确指定发送方和接收方的,并且发送的消息也没有固定的结构,甚至能够直接存储二进制数据,而且和邮件服务器同样,也能存储离线消息,最关键的是 RabbitMQ 既可以以一对一的方式进行路由,还可以以一对多的方式进行广播。算法
下面这张图是大体展现了 RabbitMQ 消息通讯的过程:apache
ps:看不懂不要紧,后面会经过具体的例子进行讲解。数组
在 RabbitMQ 的通讯过程当中,有两个主要的角色:生产者和消费者。类比于邮件通讯的发送方和接收方。安全
这里首先咱们要明确 RabbtiMQ 服务器是不可以产生数据的,正如同其名字——消息中间件,是一个用来传递消息的中间商。生产者产生建立消息,而后发布到代理服务器(RabbitMQ),而消费者则从代理服务器获取消息(不是直接找生产者要消息),并且在实际应用中,生产者和消费者也是能够角色互相转换的,因此当咱们应用程序链接到 RabbitMQ 服务器时,必需要明确我是生产者呢仍是消费者。服务器
生产者建立消息,而后发布到 RabbitMQ 服务器中,那么什么是消息?微信
这里的消息分为两部分:有效内容和内容标签。app
①、有效内容:能够是任何内容,一个数组,一个集合,甚至二进制数据均可以。RabbitMQ 不会在乎你发什么数据,尽管发就好了。负载均衡
②、内容标签:描述有效内容,是 RabbitMQ 用来决定谁将得到消息。前面说的邮件通讯,必须明确指定发送方地址和收件方地址,而基于 AMQP 协议的 RabbitMQ 则是经过生产者发送消息附带的内容标签将消息发送个感兴趣的消费者。maven
后面咱们会详细解析标签是什么,这里只须要知道生产者会建立消息并设置标签。注意最上面的大图,通常来讲生产者建立消息会设置标签,可是传输到消费者那里就没有标签了,除非你在有效内容中说明谁是生产者,通常消费者是不知道谁产生的消息的。
生产者产生了消息,而后发布到 RabbitMQ 服务器,发布以前确定要先链接上服务器,也就是要在应用程序和rabbitmq 服务器之间创建一条 TCP 链接,一旦链接创建,应用程序就能够建立一条 AMQP 信道。
信道是创建在“真实的”TCP 链接内的虚拟链接,AMQP 命令都是经过信道发送出去的,每条信道都会被指派一个惟一的ID(AMQP库会帮你记住ID的),不管是发布消息、订阅队列或者接收消息,这些动做都是经过信道来完成的。
可能有人会问,为何不直接经过 TCP 链接来发送AMQP命令呢?
这里缘由是效率问题,由于对于操做系统来讲,每次创建和销毁 TCP 会话是很是昂贵的开销,而实际系统中,好比电商双十一,每秒钟高峰期成千上万条链接,通常来讲操做系统创建TCP链接是有数量限制的,那么这就会遇到瓶颈。
引入信道的概念,咱们能够在一条 TCP 链接上建立 N多个信道,这样既能发送命令,也可以保证每条信道的私密性,咱们能够将其想象为光纤电缆。
截取上面的一部分图:
交换器和队列都是 RabbitMQ 服务器的一部分,咱们知道生产者会将消息发送到 RabbitMQ 服务器,而进入该服务器后,首先进入交换机部分,而后由交换器根据消息附带的内容标签,将消息绑定到相应的队列。咱们首先来看什么是队列:
①、容纳消息的场所,生产者发送到RabbitMQ服务器的消息会在队列中等待消费者消费。
②、队列是 RabbitMQ 服务器中最后的终点(除非消息进入了黑洞,黑洞的概念下面会介绍)。
③、队列能够实现负载均衡,咱们能够增长一堆消费者,而后让 RabbitMQ 以循环的方式来均匀的分配消息。
搞清楚了队列是什么了,那么消息是如何到达队列的呢?没错,就是经过交换器。
消息进入RabbitMQ 服务器时,会首先将消息发送到交换器,而后交换器会根据特定的路由算法以及消息的内容标签将消息绑定到相应的队列。在 AMQP 协议中有四种交换器:direct、fanout、topic和 headers,每种交换器都实现了不一样的路由算法,这也对应 RabbitMQ 工做的几种不一样方式,这是重点,后面博客会进行详细介绍。
最上面那张大图,我画了虚拟主机A以及虚拟主机B,说明在 RabbitMQ 服务器中存在着多个虚拟主机,那么虚拟主机究竟是什么?
首先咱们抛出这样一个问题,一个 RabbitMQ 确定不是只服务一个应用程序,那么多个应用程序同时使用 RabbitMQ 服务器,如何保证彼此之间不会冲突?
答案就是使用虚拟主机,虚拟主机其实就是一个迷你版的RabbitMQ 服务器,它拥有本身的交换器和队列,更重要的是虚拟主机拥有本身的权限机制,一个服务器可以建立多个虚拟主机。那么咱们在使用RabbitMQ服务器的时候,只须要将一个应用程序对应一个虚拟主机,这种各个实例间逻辑上的分离就可以保证不一样的应用程序安全的传递消息。
默认的虚拟主机是“/”。
介绍完RabbitMQ 消息通讯过程当中的一些基本概念后,下面咱们经过一个代码实例来实际感觉一下。
这是一个Maven工程,首先咱们看 pom.xml 文件:导入 amqp-client 依赖便可
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ys.rabbitmq</groupId> <artifactId>RabbitMQTest</artifactId> <version>1.0-SNAPSHOT</version> <packaging>war</packaging> <name>RabbitMQTest Maven Webapp</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> </dependencies> </project>
生产者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.ys.utils.ConnectionUtil; 6 7 /** 8 * Create by hadoop 9 */ 10 public class Send { 11 private final static String QUEUE_NAME = "hello"; 12 13 public static void main(String[] args) throws Exception{ 14 //一、获取链接 15 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 16 //二、声明通道 17 Channel channel = connection.createChannel(); 18 //三、声明(建立)队列 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 //四、定义消息内容 21 String message = "hello rabbitmq "; 22 //五、发布消息 23 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 24 System.out.println("[x] Sent'"+message+"'"); 25 //六、关闭通道和链接 26 channel.close(); 27 connection.close(); 28 } 29 }
消费者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.QueueingConsumer; 6 import com.ys.utils.ConnectionUtil; 7 8 9 /** 10 * Create by hadoop 11 */ 12 public class Recv { 13 14 private final static String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws Exception{ 17 //一、获取链接 18 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 19 //二、声明通道 20 Channel channel = connection.createChannel(); 21 //三、声明队列 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 //四、定义队列的消费者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 //五、监听队列 26 channel.basicConsume(QUEUE_NAME,true,queueingConsumer); 27 //六、获取消息 28 while (true){ 29 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 } 34 35 }
工具类:ConnectionUtil
1 package com.ys.utils; 2 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 /** 7 * Create by hadoop 8 */ 9 public class ConnectionUtil { 10 11 public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ 12 //一、定义链接工厂 13 ConnectionFactory factory = new ConnectionFactory(); 14 //二、设置服务器地址 15 factory.setHost(host); 16 //三、设置端口 17 factory.setPort(port); 18 //四、设置虚拟主机、用户名、密码 19 factory.setVirtualHost(vHost); 20 factory.setUsername(userName); 21 factory.setPassword(passWord); 22 //五、经过链接工厂获取链接 23 Connection connection = factory.newConnection(); 24 return connection; 25 } 26 }