消息中间件之RabbitMQ

初识RabbitMQ

RabbitMQ简述

  • RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合异步消息流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。
  • RabbitMQ做为一个消息代理,主要和消息打交道,负责接收并转发消息。
  • RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。活适用于排队算法、秒杀动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景

RabbitMQ支持的协议

  • AMQP 0-9-10-90-8,**和扩展协议:**RabbitMQ最先开发就是为了支持AMQP,因此该协议是Broker支持的最核心的协议。全部的版本基本上是相似的,但最新的版本描述比较不清晰或相比先前版本没有多大改善。RabbitMQ用不一样的方式扩展了AMQP 0-9-1。
  • STOMPSTOMP是一个很是简单的基于文本的消息协议。它定义了不多的消息语法,它很是容易实现,而且实现一部分将会更容易(它是仅有的能够手动使用telnet来操做的协议)。
  • MQTTMQTT是一个轻量级的 发布/订阅 消息机制的二进制协议,旨在用于低端设备的客户端上。它很好的定义了 发布/订阅消息机制,介不支持其它消息机制。
  • HTTP:HTTP固然不是一个消息协议。RabbitMQ能够经过如下三种方式来传输消息:
    • 管理插件支持一个简单的HTTP API用于发送和接收消息。主要用于测试诊断的目 的,可是针对少许的消息来讲仍是可靠的。
    • Web-STOMP插件使得,在浏览器上可 使用基于WebSockets、或者SockJS来控制消息。
    • JSON-RPC插件使浏览器经过 JSON-RPC和基于AMQP 0-9-1协议的消息进行通讯。注意JSON RPC是一个同步的协议, 基于异步传输的AMQP的一些功能将使用polling方式进行模拟。

RabbitMQ 特色

  • 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。html

  • 灵活的路由(Flexible Routing):在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。java

  • 消息集群(Clustering):多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。git

  • 高可用(Highly Available Queues):队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。github

  • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。算法

  • 多语言客户端(Many Clients):RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。数据库

  • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。编程

  • 跟踪机制(Tracing):若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。数组

  • 插件机制(Plugin System): RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。浏览器

AMQP协议

AMQP简述

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不一样的开发语言等条件的限制。AMQP协议这种下降耦合的机制是基于与上层产品,语言无关的协议。是一种二进制协议,提供客户端应用与消息中间件之间多通道、协商、异步、安全、中立和高效地交互。从总体来看,AMQP协议可划分为两层:安全

  • Functional Layer(功能层)

    功能层,位于协议上层主要定义了一组命令(基于功能的逻辑分类),用于应用程序调用实现自身所需的业务逻辑。例如:应用程序能够经过功能层定义队列名称,生产消息到指定队列,消费指定队列消息等基于(Message queues 模型)

    • AMQ 功能层设计驱动基于以下要求:
      • 使用二进制数据流压缩和解压,提升效率;
      • 能够处理任意大小的消息,且不作任何限制;
      • 单个链接支持多个通讯通道;
      • 客户端和服务端基于长连接实现,且无特殊限制;
      • 容许异步指令基于管道通讯;
      • 易扩展,基于新的需求和变化支持扩展;
      • 新版本向下兼容老版本;
      • 基于断言模型,异常能够快速定位修复;
      • 对编程语言保持中立;
      • 适应代码发展演变;
  • Transport Layer(传输层)

    传输层,基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时能够处理信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。传输层能够被任意传输替换,只要不改变应用可见的功能层相关协议,也可使用相同的传输层,同时使用不一样的高级协议

    • AMQP 传输层设计驱动给予以下要求:
      • 使用二进制数据流压缩和解压,提升效率;
      • 能够处理任意大小的消息,且不作任何限制;
      • 单个链接支持多个通讯通道;
      • 客户端和服务端基于长连接实现,且无特殊限制;
      • 容许异步指令基于管道通讯;
      • 易扩展,基于新的需求和变化支持扩展;
      • 新版本向下兼容老版本;
      • 基于断言模型,异常能够快速定位修复;
      • 对编程语言保持中立;
      • 适应代码发展演变;

AMQP 模型

  • Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,相似于网络中的namespace概念。当多个不一样的用户使用同一个RabbitMQ server提供的服务时,能够划分出多个vhost,每一个用户在本身的vhost建立exchange/queue等。
  • Connection: publisher/consumer和broker之间的TCP链接。断开链接的操做只会在client端进行,Broker不会断开链接,除非出现网络故障或broker服务出现问题。
  • Channel: 若是每一次访问RabbitMQ都创建一个Connection,在消息量大的时候创建TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部创建的逻辑链接,若是应用程序支持多线程,一般每一个thread建立单独的channel进行通信,AMQP method包含了channel id帮助客户端和message broker识别channel,因此channel之间是彻底隔离的。Channel做为轻量级的Connection极大减小了操做系统创建TCP connection的开销。
  • Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。经常使用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最终被送到这里等待consumer取走。一个message能够被同时拷贝到多个queue中。
  • Binding: exchange和queue之间的虚拟链接,binding中能够包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

工做流程

  • 发布者(Publisher)发布消息(Message),交给交换机(Exchange)。
  • 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
  • 最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

交换机类型

  • 交换机能够有两个状态:持久(durable)、暂存(transient)。
  • 持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们须要在代理再次上线后从新被声明)。
  • 并非全部的应用场景都须要持久化的交换机。
默认交换机

默认交换机(default exchange)其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个栗子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。所以,当携带着名为 “search-indexing-online” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “search-indexing-online” 的队列中。换句话说,默认交换机看起来貌似可以直接将消息投递给队列,尽管技术上并无作相关的操做。

直连型交换机

当生产者(P)发送消息 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

若是咱们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其余 Routing Key 的消息将会被丢弃

Rotuing key为booking时的示意图

Rotuing key为create时的示意图

client代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 16:54
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置链接
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 声明队列(队列属性可看下面)
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = "hello";

            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        } catch (Exception e){
            System.out.println("连接异常、、、、");
        }
    }

}
复制代码
server代码
/**
 * @Author: Young
 * @Description: 模拟一个队列同时绑定两个binding
 * @Create: 2019-09-23 17:44
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Work {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        final Channel channel1 = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        channel1.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息
        channel.basicQos(1);
        channel1.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x1] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x1] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 同一个会话, consumerTag 是固定的 能够作此会话的名字, deliveryTag 每次接收消息+1,能够作此消息处理通道的名字。
        // 所以 deliveryTag 能够用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        channel1.basicConsume(TASK_QUEUE_NAME, false, deliverCallback1, consumerTag -> { });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

复制代码
扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的全部队列,而不理会绑定的路由键。若是 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这全部的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

由于扇型交换机投递消息的拷贝到全部绑定到它的队列,因此他的应用案例都极其类似:

大规模多用户在线(MMO)游戏可使用它来处理排行榜更新等全局事件

体育新闻网站能够用它来近乎实时地将比分更新分发给移动客户端

分发系统使用它来广播各类状态和配置更新

在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,所以 XMPP 可能会是个更好的选择)

示意图

client 代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 18:16
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明交换机及他的类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);
            //
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

复制代码
server 代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 19:12
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs1 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        // 建立连接
        Connection connection = factory.newConnection();
        // 建立信道
        Channel channel = connection.createChannel();
        // 生命交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 将队列与交换机绑定
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 开始等待消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
复制代码
主题交换机

前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.

而Topic 的路由规则是一种模糊匹配,能够经过通配符知足一部分规则就能够传送。

它的约定是: 1)binding key 中能够存在两种特殊字符 “ 与“#”,用于作模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(能够是零个)。 2)routing key 为一个句点号 “.” 分隔的字符串(咱们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key 与 routing key 同样也是句点号 “.” 分隔的字符串。

当生产者发送消息 Routing Key=F.C.E 的时候,这时候只知足 Queue1,因此会被路由到 Queue1 中,若是 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,若是 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。

binding key 分别为
  • A.B.C

  • *.B.*

  • #.*.C

Rotuing key为 A.B.C 时的示意图

Rotuing key为 E.B.C 时的示意图

Rotuing key为 B.C 时的示意图

client代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:26
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
//        String[] strings={"A.B.C", "ABC"};
        String[] strings={"E.B.G", "ABC"};
//        String[] strings={"A.B", "AB"};
//        String[] strings={"B", "B"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明交换机及其类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = strings[0];
            String message = strings[1];

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

复制代码
server 代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:33
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        String[] strings = {"A.#", "*.*.C"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.3");
        factory.setPort(5672);
        factory.setUsername("young");
        factory.setPassword("young");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();


        // 同一个通道绑定多个 bindingKey
        for (String bindingKey : strings) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
复制代码
头交换机

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

头交换机能够视为直连交换机的另外一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至能够是整数或者哈希值(字典)等。灵活性更强(但实际上咱们不多用到头交换机)。工做流程:

1)、绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。

2)、传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就能够知足条件,而当 “x-match” 设置为 “all” 的时候,就须要消息头的全部值都匹配成功。

交换机小结

Queue 队列

AMQP 中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。

队列属性

队列跟交换机共享某些属性,可是队列也有一些另外的属性。

  • Name
  • Durable(消息代理重启后,队列依旧存在)
  • Exclusive(只被一个链接(connection)使用,并且当链接关闭后队列即被删除)
  • Auto-delete(当最后一个消费者退订后即被删除)
  • Arguments(一些消息代理用他来完成相似与 TTL 的某些额外功能)
队列建立

队列在声明(declare)后才能被使用。若是一个队列尚不存在,声明一个队列会建立它。若是声明的队列已经存在,而且属性彻底相同,那么这次声明不会对原有队列产生任何影响。若是声明中的属性与已存在队列的属性有差别,那么一个错误代码为 406 的通道级异常就会被抛出。

队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称做暂存队列(Transient queues)。并非全部的场景和案例都须要将队列持久化。

持久化的队列并不会使得路由到它的消息也具备持久性。假若消息代理挂掉了,从新启动,那么在重启的过程当中持久化队列会被从新声明,不管怎样,只有通过持久化的消息才能被从新恢复。

消息机制

消息确认

消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。并且网络缘由也有可能引发各类问题。这就给咱们出了个难题,AMQP 代理在何时删除消息才是正确的?AMQP 0-9-1 规范给咱们两种建议:

  • 自动确认模式:当消息代理(broker)将消息发送给应用后当即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
  • 显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

若是一个消费者在还没有发送确认回执的状况下挂掉了,那么AMQP代理会将消息从新投递给另外一个消费者。若是当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,而后再次尝试投递。

拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用能够向消息代理代表,本条消息因为 “拒绝消息(Rejecting Messages)” 的缘由处理失败了(或者未能在此时完成)。

当拒绝某条消息时,应用能够告诉消息代理如何处理这条消息——销毁它或者从新放入队列。

当此队列只有一个消费者时,请确认不要因为拒绝消息而且选择了从新放入队列的行为而引发消息在同一个消费者身上无限循环的状况发生。

在 AMQP 中,basic.reject 方法用来执行拒绝消息的操做。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。可是若是你使用的是 RabbitMQ,那么你可使用被称做 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

预取消息

在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每一个消费者一次能够接受多少条消息是很是有用的。这能够在试图批量发布消息的时候起到简单的负载均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生产应用每分钟才发送一条消息,这说明处理工做尚在运行。)

注意,RabbitMQ 只支持通道级的预取计数,而不是链接级的或者基于大小的预取。

消息属性

AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以致于 AMQP 0-9-1 明确的定义了它们,而且应用开发者们无需费心思思考这些属性名字所表明的具体含义。例如:

  • Content type(内容类型)
  • Content encoding(内容编码)
  • Routing key(路由键)
  • Delivery mode (persistent or not) 投递模式(持久化 或 非持久化)
  • Message priority(消息优先权)
  • Message publishing timestamp(消息发布的时间戳)
  • Expiration period(消息有效期)
  • Publisher application id(发布应用的 ID)

有些属性是被 AMQP 代理所使用的,可是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称做消息头(headers)。他们跟 HTTP 协议的 X-Headers 很类似。消息属性须要在消息被发布的时候定义。

消息主体

AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理看成不透明的字节数组来对待。

消息代理不会检查或者修改有效载荷。消息能够只包含属性而不携带有效载荷。它一般会使用相似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们一般使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工做,但这仅仅是基于约定而已。

消息持久化

消息可以以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。若是服务器重启,系统会确认收到的持久化消息未丢失。

简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具备持久化性质:它彻底取决与消息自己的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能形成必定的影响(就像数据库操做同样,健壮性的存在一定形成一些性能牺牲)。

简单应用

RabbitMQ RPC

RPC(Remote Procedure Call Protocol,远程过程调用协议),通常都称为“远程过程调用”。关于RPC协议自己,很少介绍,这里只介绍Openstack如何利用AMQP来实现RPC。以下图所示。

示例图

RPC是一种Client/Server通讯模型。图中上半部分是RPC的一种表现形式,表面上看起来好像是Client调用了Server的一个函数(f1),实际上,Client与Server之间是有一来(request)一往(response)两个消息(图中的下半部分)。 在request消息中,RPC Client担任Producer的角色,RPC Server担任Consumer的角色。当RPC Server接到RPC Client发送过来的request消息时,它会作相应的处理,而后,发送response消息给RPC Client,这个时候,RPC Server将担任Producer的角色,而RPC Client担任Consumer的角色。 所以,基于AMQP实现RPC的原理,以下图:

代码
client代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:45
 **/

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        // 创建connection和channel。
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
        //  求0-32的斐波那契数列之和
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    // call方法来发送RPC请求
    public String call(String message) throws IOException, InterruptedException {
        // 生成correlationId
        final String corrId = UUID.randomUUID().toString();

        // 生成默认名字的queue用于reply,并订阅它
        String replyQueueName = channel.queueDeclare().getQueue();

        // 发送request message,设置参数replyTo和correlationId.
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        // 由于消费者发送response是在另外一个线程中,咱们须要让main线程阻塞,在这里咱们使用BlockingQueue
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        // 消费者进行简单的处理,为每个response message检查其correlationId,若是是,则将response添加进阻塞队列
        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });
        // 在队列为空时,获取元素的线程会等待队列变为非空
        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

复制代码
server代码
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:46
 **/

import com.rabbitmq.client.*;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    
    // 斐波那契函数
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置链接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            // 清空队列
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");
            
            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(delivery.getProperties().getCorrelationId())
                                .build();

                        String response = "";

                        try {
                            String message = new String(delivery.getBody(), "UTF-8");
                            int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    // 对消息进行应答
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // 唤醒正在消费的进程
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };
            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // 在收到消息前,本线程进入等待状态
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

复制代码

未完待续 。。。


参考:

RabbitMQ 官网

深刻理解AMQP协议

AMQP 协议详解

AMQP的几种通讯模式

消息队列之 RabbitMQ

RabbitMQ与AMQP协议详解


若有不当之处,欢迎留言(手动滑稽)。。。

相关文章
相关标签/搜索