吐血总结吧~由于太经常使用,由于是平常,由于即便是平常也并无多少人懂的不少,由于想掌握一个消息中间件,就是如此吧,很少说,我一篇,我主要讲讲基本的使用与方法调用的入参及其意义,其实,RabbitMQ就光是基础的api调用,也并无多少人熟悉~java
想来想去,不知道如何开篇,我大概就把我学习到的一些比较重要的概念罗列出来吧,就算是一个记录吧。下面是一些我想写入的点面试
基本的安装我不在我文章里面介绍,这种东西一搜一大把,并且每一个平台安装模式也不同,例如个人mac,直接一条命令搞定。我看Linux下面还要先安装erlang的虚拟机。这些并不是RabbitMQ的核心技术,想作运维,出门左转。设计模式
RabbitMQ主要实现了AMQP这个很牛逼的协议。其实这种就相似于Redis,任何的Java这个层面的接口API调用,都是对应着相关协议的命令输出的。这东西细想也简单,无非就是咱们给RabbitMQ的server端下命令,让他作什么事儿。例如我让他建立信道,我就发一个Channel.open的字符串过去,对方server端起了个socket监听器,发现有数据,立刻处理,而后匹配到具体的命令处理器,相似于策略设计模式,使用一个Handler处理逻辑。本节我经过生产者与消费者的最简单代码,来窥探一下,到底RabbitMQ的流转过程是怎么样的,让你们有个初步的印象:哦原来,RabbitMQ大概是这样的。api
先上基础的最简单的生产者代码:安全
// 建立链接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel(); String msg = "21123123"; channel.basicPublish("ExchargeName", "RoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 关闭资源 channel.close(); connection.close();
接下俩咱们来看看流转过程的用例图:并发
其中,Product表明生产者节点,Consumer表明消费者节点,Broker表明RabbitMQ Server端的一个机器节点。以后再也不说明运维
一样,咱们先来一个最简单的消费者的代码模板:异步
// 建立链接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){} channel.basicQos(64); channel.basicConsume("queueName",consumer); // 关闭资源 channel.close(); connection.close();
下面是消费者的流转图:socket
请细看这两个时序图,与图中的标记,其中对应了具体哪一个JavaAPI调用,对应了哪一个AMQP的消息协议命令。不能,就是稍微多了点步骤。接下来,我会在遇到相关AMQP的命令时候,再继续介绍,就不集中介绍所有的AMQP协议命令了。ide
我本身感受,集中所有介绍没用,由于是我的根本无法立刻记住,即便我给你十天记住了,而后1天不用就忘了。说白了,仍是要紧扣实战的编码,单独研究协议,枯燥且没意义,即便面试,人家也不会专门问你命令。
发现,AMQP的协议中,我大体看了两个实现(RabbitMQ和Qpid),对这几个基础的构件,都是提早谈起的,可见,AMQP就是根据这几个构件来组织的,我就不经过大量文字解说这几个构件了,尽可能经过图片来展现,好懂一些。
主要的构件包括:
为了说明队列,先上一个虚拟的图(由于真实状况,消息不会直接到队列的):
其实,咱们使用生产者,pulish一条消息,首先发送到的,就是交换机这个实体里面,并不是队列,而后经过各类绑定关系再路由到具体的队列中,下一小节,咱们就讲这个路由的问题。首先来看看交换机的示例图:
交换机类型:
具体的RoutingKey和BindingKey如何区分下面咱们来讲,即便咱们没有指定具体的交换机发送给一个queue,RabbitMQ服务端也会随机生成一个交换机,后面会介绍到
下面是RoutingKey和BindingKey的相关图例:
直接上具体的区分方法吧:
这一部分让咱们开始写代码,看看如何用RabbitMQ来发送与接收消息,而且接口参数都是什么意思,毕竟发送与接收代码中的参数仍是挺多的,代码不算整洁,咱们来看看为啥不整洁,为啥要用这些个参数。
两种方式来建立:
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); connectionFactory.setVirtualHost("/root"); // 建立链接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel();
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://root:root123@127.0.0.1:5672/root"); // 建立链接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel();
使用注意事项:
对应的方式是:exchangeDeclare。先来看看Java中API客户端的源码,发现注释已经介绍的很清楚了:
/** * 激活一个不自动删除, 不持久化,且没有拓展参数的交换机 * @param exchange 交换机名称 * @param type 交换机类型 * @return 确认交换机声明是否成功 * @throws java.io.IOException io异常 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; /** * 激活一个不自动删除, 不持久化,且没有拓展参数的交换机 * @param exchange 交换机名称 * @param type 交换机类型(封装了个enum给你用) * @return 确认交换机声明是否成功 * @throws java.io.IOException io异常 */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException; /** * 激活一个不自动删除, 且没有拓展参数的交换机 * @param exchange 交换机名称 * @param type 交换机类型 * @param durable true表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机) */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; /** * 激活一个不自动删除, 且没有拓展参数的交换机 * @param exchange 交换机名称 * @param type 交换机类型(封装了个enum给你用) * @param durable true表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机) */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException; /** * 声明一个交换机 * @param exchange 交换机名称 * @param type 交换机类型 * @param durable ttrue表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机) * @param autoDelete true表示若是没有被使用,这个交换机会自动删除 * @param arguments 其余的一个些属性 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 声明一个交换机 * @param exchange 交换机名称 * @param type 交换机类型(封装了个enum给你用) * @param durable ttrue表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机) * @param autoDelete true表示若是没有被使用,这个交换机会自动删除 * @param arguments 其余的一个些属性 */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * @param internal true表示这个交换机是一个内部的, 不能直接使用客户端的api进行pubulish操做 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; /** * 和前面的方式差很少,可是没有返回值,且无论服务端是否有response */ void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; /** * 被动的声明一个交换机; 判断这个名字的交换机是否存在 * @throws IOException 若是交换机不存在,会报404的异常 */ Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException; /** * 删除一个交换机 * @param exchange 交换机名称 * @param ifUnused true表示肯定这个交换机是否被使用,只有彻底不被使用的状况下,才能删除 * @return a deletion-confirm method to indicate the exchange was successfully deleted * @throws java.io.IOException if an error is encountered */ Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException; /** * 一样也是删除一个交换机,可是没有返回值,不等待服务端的返回确认 */ void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
其实直接看源码,一切已经很明了了,下面我对极个别一些注意点说说:
使用的方法是queueDeclare,对于队列的声明,重载方法少不少,一样的咱们直接来看看源码与源码中的注释:
/** * 声明一个服务端命名的, 本链接独占的, 自动删除的, 不持久化的队列 * 建立以后队列的名称,能够从返回对象Queue.DeclareOk中的"queue"字段获取. */ Queue.DeclareOk queueDeclare() throws IOException; /** * 声明一个队列 * @param queue 队列的名称 * @param durable true表示这个队列要作持久化(服务端重启以后,这个队列同样存在) * @param exclusive true表示这个队列是一个独占的(被限制只能当前connection使用) * @param autoDelete true表示这个队列是自动删除的 (服务端会在这个队列不被使用的时候将其删除) * @param arguments 队列建立时候的其余参数 */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 相似于queueDeclare方法,可是没有返回值,而且无论服务端是否有回复,声明是否成功 */ void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 声明一个队列,若是队列不存在,将会抛出一个异常,而且关闭Channel */ Queue.DeclareOk queueDeclarePassive(String queue) throws IOException; /** * 删除一个队列, 无论是否这个队列被使用或者里面有消息 * @param queue 队列名称 */ Queue.DeleteOk queueDelete(String queue) throws IOException; /** * 删除一个队列 * @param queue 队列名称 * @param ifUnused true表示这个队列只有不被使用时候才删除 * @param ifEmpty true表示只有这个队列为空的时候才删除 */ Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; /** * 和queueDelete(String, boolean, boolean)相似,可是没有返回值,且不等服务端返回 */ void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
一样的,介绍几个重点问题:
这一部分,主要是将咱们声明的交换机与队列创建关联的一个动做,涉及到的方法有:
下面咱们来看看源码与注释:
/** * 将一个队列绑定到交换机上面, 不带任何拓展参数 * @param queue 队列名称 * @param exchange 交换机名称 * @param routingKey 这里其实就是绑定的键 */ Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; /** * 将一个队列绑定到交换机上面 * @param queue 队列名称 * @param routingKey 绑定键 * @param arguments 其余的拓展参数 */ Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 无论是否绑定成功就返回的绑定方法 */ void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 将队列与一个交换机经过绑定键绑定在一块儿的关系解除 * @param queue 队列名称 * @param exchange 交换机名称 * @param routingKey 绑定键 */ Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException; Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 将一个交换机绑定到另一个交换机上面 * @param destination 这个是消息要到达的交换机名称 * @param source 这个是消息从哪里来的交换机名称 * @param routingKey 绑定键 */ Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException; Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException; void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
主要使用的参数,经过注释已经很明了了,其余的注意点暂时没有。
发送消息这里对应的方法,主要就是basicPublish这个方法,先来看看他的API接口:
/** * 发布一个消息到服务端。 * * 若是发布的消息到交换机不存在,会抛出一个channel-level protocol的异常,而且会关闭这个channel * * @param exchange 消息要发布到的交换机 * @param routingKey 这个是路由键(会匹配具体的绑定键) * @param 一个消息其余的属性 - 例如headers * @param body 消息体 */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; /** * 发布一个消息到服务端。 * * @param exchange 消息要发布到的交换机 * @param routingKey 这个是路由键 * @param mandatory 这个后面文章会介绍 * @param 一个消息其余的属性 - 例如headers * @param 消息体 */ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; /** * 发布一个消息到服务端。 * * @param mandatory 后面文章介绍 * @param immediate 后面文章介绍 */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
这里能够看到的是,mandatory、immediate这两个入参很关键,后面会专门讲,由于这两个入参关系到了消息的可靠性和稳定性。下面是一些能够添加属性的代码示例:
channel.basicPublish("ExchargeName", "RoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.basicPublish("ExchargeName", "RoutingKey", new AMQP.BasicProperties.Builder() .contentType("test/plain") .deliveryMode(2) .priority(1) .userId("hiden") .build(), msg.getBytes());
下面介绍下经过Builder模式构建发消息时候的属性,有助于了解消息的可靠性:
消费消息来讲,相比较来讲要比单纯的接口调用要复杂点,由于涉及到方法的注册等,还存在两种消费模式:
在推模式中,能够经过持续订阅方式来消费消息,主要经过两个类:
设计这两个消费对象的类的方法是:
/** * 启动一个非本地, 非排他, 要手动ack的, 服务端自定义的消费者标签名称的消费者 */ String basicConsume(String queue, Consumer callback) throws IOException; /** * 启动一个非本地, 非排他, 服务端自定义的消费者标签名称的消费者 */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; /** * 启动一个非本地, 非排他, 服务端自定义的消费者标签名称的消费者(固然带了个参数map) */ String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException; /** * 启动一个非本地, 非排他的消费者(固然带了个参数map) * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param consumerTag a client-generated consumer tag to establish context * @param callback an interface to the consumer object * @return the consumerTag associated with the new consumer * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; /** * 启动一个消费者 * @param queue 队列名称 * @param autoAck 消费者是否自动确认 * @param consumerTag 消费者标签 * @param noLocal true表示不能把同一个connection中生产者发送的消息传送给这个connection中的消费者 * @param exclusive 是不是拍他的消费者 * @param arguments 设置消费者的其余参数 * @param callback 设置消费者的回调函数,用来处理RabbitMQ推送过来的消息 */ String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
下面是一段消费者的真实代码:
channel.basicQos(64); channel.basicConsume("queueName", false, "tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); String msg = new String(body, "UTF-8"); // 这里进行消息处理 channel.basicAck(deliveryTag,false); } });
具体的DefaultConsumer类里面还有其余的几个方法,对应不一样的消费动做回调的函数:
若是手动调用了Channel#basicCancel,触发方法的序列是:
另外还要注意的是:这些个被触发的对象都被分配到了不一样的线程池上,因此要注意线程安全性的问题
拉模式,有别于推模式的被动获取消息,咱们能够直接在业务逻辑中,主动获取RabbitMQ服务端中的消息,下面是总体的时序图
对应的代码是:
GetResponse response = channel.basicGet("QueueName", false); System.out.println(response.getBody()); channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
比较下,推模式和拉模式:
这里我自我感受很关键,由于涉及到消息消费的可靠性。首先让咱们来看看ack机制:
总体上,RabbitMQ的服务端存在两种队列:
另外,RabbitMQ不会主动删除或者过时未确认的消息,判断是否从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开。若是长时间没回复ack,直到链接断开,此消息会从新进入队列,等待投递给下一个消费者,固然也多是当前消费者。
另一个点,是咱们能够直接拒绝一个消息,使用的方法是:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
第一个参数好理解,重点说下第二个,requeue:
还有个方法,能够请求RabbitMQ从新发送还未被确认的消息,有下面两个方法
Basic.RecoverOk basicRecover() throws IOException; Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
再次涉及到requeue参数:
关闭涉及到的有两个对象的关闭:
关闭的方法有:
void close()
void close(int closeCode, String closeMessage)
整个AMQP协议中connection与channel声明周期以下:
固然咱们能够注册关闭以后触发的监听器:
connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { // 业务逻辑 Method reason = cause.getReason(); } });
接下俩大概会有两篇,一篇是功能性的拔高,一篇是RabbitMQ的原理相关,若是可能我会说说集群构建与镜像