RabbitMQ实际上是我最先接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一我的去看,因为RabbitMQ官网的英文还不算太难,所以也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而如今回过头来再看,发现已经忘记了个差很少了,如今再回过头来继续看看,然乎记之。以防再忘,读者看时最好有必定的MQ基础。框架
首先咱们须要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。dom
存储消息的地方,多个生产者能够将消息发送到一个队列,多个消费者也能够消费同一个队列的消息。学习
注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,而且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。
消费消息的一方fetch
public class Send { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace(); } } }
public class Recv { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }catch (Exception e){ e.printStackTrace(); } } }
一、Rabbit是如何保证消息被消费的?
答:经过ack机制。每当一个消息被消费端消费的时候,消费端能够发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费而且能够被delete了。;若是一条消息被消费可是没有发送ack,那么此时RabbitMQ将会认为须要从新消费该消息,若是此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。ui
注意:开启ack机制的是autoAck=
false
;
二、消息如何进行持久化?spa
注意:消息持久化并不必定保证消息不会被丢失
三、RabbitMQ如何避免两个消费者一个很是忙一个很是闲的状况?
经过以下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成而且返回ack给RabbitMQ以后才给它派发新的消息。3d
int prefetchCount = 1 ; channel.basicQos(prefetchCount)
四、RabbitMQ异常状况下如何保证消息不会被重复消费?
须要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。code
在RabbitMQ中,生产者其实历来不会发送消息到队列,甚至,它不知道消息被发送到了哪一个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,而后由交换机将消息转发给队列。server
从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给全部queue仍是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一块儿谈谈这几种类型blog
fanout:广播模式,这个比较好理解,就是全部的队列都能收到交换机的消息。
如上面,两个队列都能收到交换机的消息。
这个模式至关于发布/订阅模式的一种,当交换机类型为direct的时候,此时咱们须要设置两个参数:
有了这两个参数,咱们就能够指定咱们订阅哪些消息了。
如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。
其实topic和direct有一点相似,它至关于对direct做了加强。在direct中,咱们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,可是有时候咱们的需求不是这样,咱们可能想要的是正则匹配便可,那么Topic就派上用场了。
当类型为topic时,它的bindKey对应字符串须要是以“.”分割,同时RabbitMQ还提供了两个符号:
上图的意思是:全部第二个单词为orange的消息发送个Q1,全部最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。
这一种类型官方demo没有过多解释,这里也不研究了。
RabbitMQ 还能够实现RPC(远程过程调用)。什么是RPC,简单来讲就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成以后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的String replyQueueName = channel.queueDeclare().getQueue();
。
首先咱们须要明白一点的是为何须要这个queue?咱们知道在RabbitMQ做消息队列的时候,Client只须要将消息投放到queue中,而后Server从queue去取就能够了。可是在RabbitMQ做为RPC的时候多了一点就是,Client还须要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。
Correlation Id
在上面咱们知道Server返回数据给Client是经过Callback Queue的,那么是为每个request都建立一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在咱们发送request,绑定一个惟一ID(correlationId),而后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来讲一个Callback Queue是Client级别而不是request级别的了。
实现
上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单仍是贴下把。
client 端
public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) throws Exception{ RPCClient fibonacciRpc = new RPCClient(); try { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (Exception e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } }
服务端
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
此次回头再看RabbitMQ,再次从新理解了如下RabbitMQ,有些东西仍是要慢慢嚼的。固然这些也都是官网的入门例子,后续有机会的话再深刻研究。