RabbitMQ基础教程之基本使用篇

RabbitMQ基础教程之基本使用篇

最近由于工做缘由使用到RabbitMQ,以前也接触过其余的mq消息中间件,从实际使用感受来看,却不太同样,正好趁着周末,能够好好看一下RabbitMQ的相关知识点;但愿能够经过一些学习,能够搞清楚如下几点java

  • 基础环境搭建
  • 能够怎么使用
  • 实现原理是怎样的
  • 实际工程中的使用(好比结合SpringBoot能够怎么玩)

<!-- more -->git

相关博文,欢迎查看:github

I. 前提准备

在开始以前,先得搭建基本的环境,由于我的主要是mac进行的开发,全部写了一篇mac上如何安装rabbitmq的教程,能够经过 《mac下安装和测试rabbitmq》 查看服务器

1. Centos安装过程

下面简单说一下Linux系统下,能够如何安装ide

Centos 系统:学习

# 安装erlang
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang

# 安装RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

启动和查看的命令测试

# 完成后启动服务:
service rabbitmq-server start
# 能够查看服务状态:
service rabbitmq-server status

2. 注意

  • 安装完毕以后,能够开启控制台,主要就是 rabbitmq-plugins enable rabbitmq_management, 默认的端口号为15672
  • 默认分配的用户/密码为: guest/guest, 只容许本地访问;若是跨应用读写数据时,请添加帐号和设置对应的权限(推荐参考上面mac安装的博文,里面有介绍)

II. 基本使用篇

直接使用amqp-client客户端作基本的数据读写,先不考虑Spring容器的场景,咱们能够怎样进行塞数据,而后又怎样能够从里面获取数据;ui

在实际使用以前,有必要了解一下RabbitMQ的几个基本概念,即什么是Queue,Exchange,Binding,关于这些基本概念,能够参考博文:3d

1. 基本使用姿式

首先是创建链接,通常须要设置服务器的IP,端口号,用户名密码之类的,公共代码以下code

public class RabbitUtil {

    public static ConnectionFactory getConnectionFactory() {
        //建立链接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }
}

a. 生产者

要使用,基本的就须要一个消息投递和一个消息消费两方,线看消息生产者的通常写法

public class MsgProducer {
    public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message)
            throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立链接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();

        // 声明exchange中的消息为可持久化,不自动删除
        channel.exchangeDeclare(exchange, exchangeType, true, false, null);

        // 发布消息
        channel.basicPublish(exchange, toutingKey, null, message.getBytes());

        channel.close();
        connection.close();
    }
}

针对上面的代码,结合RabbitMQ的基本概念进行分析

基本结构

  • 不论是干啥,第一步都是获取链接,也就是上面的Connection
  • 《RabbitMq基础教程之基本概念》直到,生产者消费者都是借助Channel与Exchange或者Queue打交道,接下来就是经过Connection建立数据流通讯道Channel
  • Channel准备完毕以后,生产者就能够向其中投递数据
  • 投递完毕以后,回收现场资源

疑问:

  • 在声明Exchange时,是否就须要选择消息绑定策略?
  • 不声明时,默认是什么策略?

b. 消费者

结合上面的代码和分析,大胆的预测下消费者的流程

  • 获取链接Connection
  • 建立Channel
  • 将Channel与Queue进行绑定
  • 建立一个Consumer,从Queue中获取数据
  • 消息消费以后,ack

下面给出一个mq推数据的消费过程

public class MsgConsumer {

    public static void consumerMsg(String exchange, String queue, String routingKey)
            throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();
        //建立链接
        Connection connection = factory.newConnection();

        //建立消息信道
        final Channel channel = connection.createChannel();

        //消息队列
        channel.queueDeclare(queue, true, false, false, null);
        //绑定队列到交换机
        channel.queueBind(queue, exchange, routingKey);
        System.out.println("[*] Waiting for message. To exist press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                try {
                    System.out.println(" [x] Received '" + message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自动ack
        channel.basicConsume(queue, false, consumer);
    }
}

2. Direct方式

a. Producer

直接在前面的基础上进行测试,咱们定义一个新的exchange名为direct.exchange,而且制定ExchangeType为直接路由方式 (先无论这种写法的合理性)

public class DirectProducer {
    private static final String EXCHANGE_NAME = "direct.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        DirectProducer directProducer = new DirectProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

上面的代码执行一遍以后,看控制台会发现新增了一个Exchange

exchange

b. consumer

一样的咱们写一下对应的消费者,一个用来消费aaa,一个消费bbb

public class DirectConsumer {

    private static final String exchangeName = "direct.exchange";

    public void msgConsumer(String queueName, String routingKey) {
        try {
            MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        DirectConsumer consumer = new DirectConsumer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String[] queueNames = new String[]{"qa", "qb"};


        for (int i = 0; i < 2; i++) {
            consumer.msgConsumer(queueNames[i], routingKey[i]);
        }

        Thread.sleep(1000 * 60 * 10);
    }
}

执行上面的代码以后,就会多两个Queue,且增长了Exchange到Queue的绑定

binding

queue

当上面两个代码配合起来使用时,就能够看到对于消费者而言,qa一直消费的是偶数,qb一直消费的是奇数,一次输出以下:

[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
 [qa] Received 'hello >>> 0
 [qb] Received 'hello >>> 1
 [qa] Received 'hello >>> 2
 [qb] Received 'hello >>> 3
 [qa] Received 'hello >>> 4
...

3. Fanout方式

有了上面的case以后,这个的实现和测试就比较简单了

a. Producer

public class FanoutProducer {
    private static final String EXCHANGE_NAME = "fanout.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FanoutProducer directProducer = new FanoutProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

b. consumer

public class FanoutProducer {
    private static final String EXCHANGE_NAME = "fanout.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FanoutProducer directProducer = new FanoutProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

这个的输出就比较有意思了,fa,fb两个队列均可以接收到发布的消息,并且单独的执行一次上面的投递数据以后,发现fa/fb两个队列的数据都是30条

30

而后消费的结果以下

[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
 [qa] Received 'hello >>> 0
 [qb] Received 'hello >>> 0
 [qa] Received 'hello >>> 1
 [qb] Received 'hello >>> 1
 [qb] Received 'hello >>> 2
 [qa] Received 'hello >>> 2
 [qa] Received 'hello >>> 3
 [qb] Received 'hello >>> 3
 [qb] Received 'hello >>> 4
 [qa] Received 'hello >>> 4
 ...

4. Topic方式

代码和上面差很少,就不重复拷贝了,接下来卡另外几个问题

III. 基础进阶

在上面的基础使用中,会有几个疑问以下:

  • Exchange声明的问题(是否必须声明,若是不声明会怎样)
  • Exchange声明的几个参数(durable, autoDelete)有啥区别
  • 当没有队列和Exchange绑定时,直接往队列中塞数据,好像不会有数据增长(即先塞数据,而后建立queue,创建绑定,从控制台上看这个queue里面也不会有数据)
  • 消息消费的两种姿式(一个主动去拿数据,一个是rabbit推数据)对比
  • ack/nack怎么用,nack以后消息能够怎么处理

以上内容,留待下一篇进行讲解

IV. 其余

1. 相关博文

2. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,记录全部学习和工做中的博文,欢迎你们前去逛逛

3. 声明

尽信书则不如,已上内容,纯属一家之言,因我的能力有限,不免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

4. 扫描关注

QrCode

相关文章
相关标签/搜索