RabbitMQ快速入门

1、前言

RabbitMQ实际上是我最先接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一我的去看,因为RabbitMQ官网的英文还不算太难,所以也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而如今回过头来再看,发现已经忘记了个差很少了,如今再回过头来继续看看,然乎记之。以防再忘,读者看时最好有必定的MQ基础。框架

2、RabbitMQ

首先咱们须要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。dom

2.一、队列

存储消息的地方,多个生产者能够将消息发送到一个队列,多个消费者也能够消费同一个队列的消息。学习

注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,而且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。

2.二、消费者

消费消息的一方fetch

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.三、小结

一、Rabbit是如何保证消息被消费的?
答:经过ack机制。每当一个消息被消费端消费的时候,消费端能够发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费而且能够被delete了。;若是一条消息被消费可是没有发送ack,那么此时RabbitMQ将会认为须要从新消费该消息,若是此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。ui

注意:开启ack机制的是autoAck= false;

二、消息如何进行持久化?spa

  • 将queue持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二个参数durable为true
  • 设置消息持久化,即设置MessageProperties.PERSISTENT_TEXT_PLAIN
注意:消息持久化并不必定保证消息不会被丢失

三、RabbitMQ如何避免两个消费者一个很是忙一个很是闲的状况?
经过以下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成而且返回ack给RabbitMQ以后才给它派发新的消息。3d

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

四、RabbitMQ异常状况下如何保证消息不会被重复消费?
须要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。code

2.二、交换机

在RabbitMQ中,生产者其实历来不会发送消息到队列,甚至,它不知道消息被发送到了哪一个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,而后由交换机将消息转发给队列。server

从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给全部queue仍是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一块儿谈谈这几种类型blog

2.2.一、fanout

fanout:广播模式,这个比较好理解,就是全部的队列都能收到交换机的消息。
clipboard.png
如上面,两个队列都能收到交换机的消息。

2.2.二、direct

这个模式至关于发布/订阅模式的一种,当交换机类型为direct的时候,此时咱们须要设置两个参数:

  1. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二个参数,咱们能够把它称呼为routeKey
  2. channel.queueBind(queueName, EXCHANGE_NAME, "");第三个参数,咱们把它称呼为bindKey

有了这两个参数,咱们就能够指定咱们订阅哪些消息了。

clipboard.png
如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。

2.2.三、topic

其实topic和direct有一点相似,它至关于对direct做了加强。在direct中,咱们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,可是有时候咱们的需求不是这样,咱们可能想要的是正则匹配便可,那么Topic就派上用场了。

clipboard.png

当类型为topic时,它的bindKey对应字符串须要是以“.”分割,同时RabbitMQ还提供了两个符号:

  • 星号(*):表示1个单词
  • 井号(#):表示0、多个单词

上图的意思是:全部第二个单词为orange的消息发送个Q1,全部最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。

2.2.四、header

这一种类型官方demo没有过多解释,这里也不研究了。

2.三、RPC

RabbitMQ 还能够实现RPC(远程过程调用)。什么是RPC,简单来讲就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成以后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的String replyQueueName = channel.queueDeclare().getQueue();
首先咱们须要明白一点的是为何须要这个queue?咱们知道在RabbitMQ做消息队列的时候,Client只须要将消息投放到queue中,而后Server从queue去取就能够了。可是在RabbitMQ做为RPC的时候多了一点就是,Client还须要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。
Correlation Id
在上面咱们知道Server返回数据给Client是经过Callback Queue的,那么是为每个request都建立一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在咱们发送request,绑定一个惟一ID(correlationId),而后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来讲一个Callback Queue是Client级别而不是request级别的了。

实现
上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单仍是贴下把。
client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服务端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

3、总结

此次回头再看RabbitMQ,再次从新理解了如下RabbitMQ,有些东西仍是要慢慢嚼的。固然这些也都是官网的入门例子,后续有机会的话再深刻研究。

相关文章
相关标签/搜索