可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。html
灵活的路由(Flexible Routing):在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。java
消息集群(Clustering):多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。git
高可用(Highly Available Queues):队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。github
多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。算法
多语言客户端(Many Clients):RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。数据库
管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。编程
跟踪机制(Tracing):若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。数组
插件机制(Plugin System): RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。浏览器
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不一样的开发语言等条件的限制。AMQP协议这种下降耦合的机制是基于与上层产品,语言无关的协议。是一种二进制协议,提供客户端应用与消息中间件之间多通道、协商、异步、安全、中立和高效地交互。从总体来看,AMQP协议可划分为两层:安全
Functional Layer(功能层)
功能层,位于协议上层主要定义了一组命令(基于功能的逻辑分类),用于应用程序调用实现自身所需的业务逻辑。例如:应用程序能够经过功能层定义队列名称,生产消息到指定队列,消费指定队列消息等基于(Message queues 模型)
Transport Layer(传输层)
传输层,基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时能够处理信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。传输层能够被任意传输替换,只要不改变应用可见的功能层相关协议,也可使用相同的传输层,同时使用不一样的高级协议
默认交换机(default exchange)其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。
它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个栗子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。所以,当携带着名为 “search-indexing-online” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “search-indexing-online” 的队列中。换句话说,默认交换机看起来貌似可以直接将消息投递给队列,尽管技术上并无作相关的操做。
若是咱们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其余 Routing Key 的消息将会被丢弃
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 16:54 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 设置链接 factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列(队列属性可看下面) channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = "hello"; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } catch (Exception e){ System.out.println("连接异常、、、、"); } } } 复制代码
/** * @Author: Young * @Description: 模拟一个队列同时绑定两个binding * @Create: 2019-09-23 17:44 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Work { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); final Channel channel1 = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel1.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息 channel.basicQos(1); channel1.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x1] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x1] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 同一个会话, consumerTag 是固定的 能够作此会话的名字, deliveryTag 每次接收消息+1,能够作此消息处理通道的名字。 // 所以 deliveryTag 能够用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); channel1.basicConsume(TASK_QUEUE_NAME, false, deliverCallback1, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } } 复制代码
由于扇型交换机投递消息的拷贝到全部绑定到它的队列,因此他的应用案例都极其类似:
大规模多用户在线(MMO)游戏可使用它来处理排行榜更新等全局事件
体育新闻网站能够用它来近乎实时地将比分更新分发给移动客户端
分发系统使用它来广播各类状态和配置更新
在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,所以 XMPP 可能会是个更好的选择)
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 18:16 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明交换机及他的类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); // channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 复制代码
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 19:12 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); // 建立连接 Connection connection = factory.newConnection(); // 建立信道 Channel channel = connection.createChannel(); // 生命交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 将队列与交换机绑定 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 回调函数 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; // 开始等待消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); 复制代码
而Topic 的路由规则是一种模糊匹配,能够经过通配符知足一部分规则就能够传送。
它的约定是: 1)binding key 中能够存在两种特殊字符 “” 与“#”,用于作模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(能够是零个)。 2)routing key 为一个句点号 “.” 分隔的字符串(咱们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key 与 routing key 同样也是句点号 “.” 分隔的字符串。
当生产者发送消息 Routing Key=F.C.E 的时候,这时候只知足 Queue1,因此会被路由到 Queue1 中,若是 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,若是 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。
A.B.C
*.B.*
#.*.C
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 20:26 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { // String[] strings={"A.B.C", "ABC"}; String[] strings={"E.B.G", "ABC"}; // String[] strings={"A.B", "AB"}; // String[] strings={"B", "B"}; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明交换机及其类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = strings[0]; String message = strings[1]; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } 复制代码
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 20:33 **/ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { String[] strings = {"A.#", "*.*.C"}; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.3"); factory.setPort(5672); factory.setUsername("young"); factory.setPassword("young"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); // 同一个通道绑定多个 bindingKey for (String bindingKey : strings) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } 复制代码
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
头交换机能够视为直连交换机的另外一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至能够是整数或者哈希值(字典)等。灵活性更强(但实际上咱们不多用到头交换机)。工做流程:
1)、绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
2)、传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就能够知足条件,而当 “x-match” 设置为 “all” 的时候,就须要消息头的全部值都匹配成功。
AMQP 中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。
队列跟交换机共享某些属性,可是队列也有一些另外的属性。
队列在声明(declare)后才能被使用。若是一个队列尚不存在,声明一个队列会建立它。若是声明的队列已经存在,而且属性彻底相同,那么这次声明不会对原有队列产生任何影响。若是声明中的属性与已存在队列的属性有差别,那么一个错误代码为 406 的通道级异常就会被抛出。
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称做暂存队列(Transient queues)。并非全部的场景和案例都须要将队列持久化。
持久化的队列并不会使得路由到它的消息也具备持久性。假若消息代理挂掉了,从新启动,那么在重启的过程当中持久化队列会被从新声明,不管怎样,只有通过持久化的消息才能被从新恢复。
消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。并且网络缘由也有可能引发各类问题。这就给咱们出了个难题,AMQP 代理在何时删除消息才是正确的?AMQP 0-9-1 规范给咱们两种建议:
若是一个消费者在还没有发送确认回执的状况下挂掉了,那么AMQP代理会将消息从新投递给另外一个消费者。若是当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,而后再次尝试投递。
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用能够向消息代理代表,本条消息因为 “拒绝消息(Rejecting Messages)” 的缘由处理失败了(或者未能在此时完成)。
当拒绝某条消息时,应用能够告诉消息代理如何处理这条消息——销毁它或者从新放入队列。
当此队列只有一个消费者时,请确认不要因为拒绝消息而且选择了从新放入队列的行为而引发消息在同一个消费者身上无限循环的状况发生。
在 AMQP 中,basic.reject 方法用来执行拒绝消息的操做。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。可是若是你使用的是 RabbitMQ,那么你可使用被称做 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每一个消费者一次能够接受多少条消息是很是有用的。这能够在试图批量发布消息的时候起到简单的负载均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生产应用每分钟才发送一条消息,这说明处理工做尚在运行。)
注意,RabbitMQ 只支持通道级的预取计数,而不是链接级的或者基于大小的预取。
AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以致于 AMQP 0-9-1 明确的定义了它们,而且应用开发者们无需费心思思考这些属性名字所表明的具体含义。例如:
有些属性是被 AMQP 代理所使用的,可是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称做消息头(headers)。他们跟 HTTP 协议的 X-Headers 很类似。消息属性须要在消息被发布的时候定义。
AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理看成不透明的字节数组来对待。
消息代理不会检查或者修改有效载荷。消息能够只包含属性而不携带有效载荷。它一般会使用相似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们一般使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工做,但这仅仅是基于约定而已。
消息可以以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。若是服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具备持久化性质:它彻底取决与消息自己的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能形成必定的影响(就像数据库操做同样,健壮性的存在一定形成一些性能牺牲)。
RPC(Remote Procedure Call Protocol,远程过程调用协议),通常都称为“远程过程调用”。关于RPC协议自己,很少介绍,这里只介绍Openstack如何利用AMQP来实现RPC。以下图所示。
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 20:45 **/ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient implements AutoCloseable { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { // 创建connection和channel。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { // 求0-32的斐波那契数列之和 for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } // call方法来发送RPC请求 public String call(String message) throws IOException, InterruptedException { // 生成correlationId final String corrId = UUID.randomUUID().toString(); // 生成默认名字的queue用于reply,并订阅它 String replyQueueName = channel.queueDeclare().getQueue(); // 发送request message,设置参数replyTo和correlationId. AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); // 由于消费者发送response是在另外一个线程中,咱们须要让main线程阻塞,在这里咱们使用BlockingQueue final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); // 消费者进行简单的处理,为每个response message检查其correlationId,若是是,则将response添加进阻塞队列 String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); // 在队列为空时,获取元素的线程会等待队列变为非空 String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } } 复制代码
/** * @Author: Young * @Description: young.thrift.test_thrift.test * @Create: 2019-09-23 20:46 **/ import com.rabbitmq.client.*; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; // 斐波那契函数 private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 设置链接参数 factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("***"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // 清空队列 channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); // 对消息进行应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 唤醒正在消费的进程 synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // 在收到消息前,本线程进入等待状态 while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } 复制代码
参考:
若有不当之处,欢迎留言(手动滑稽)。。。