RabbitMQ是如何运转的?

前言

  以前已经介绍了RabbitMQ交换机模型的相关简单概念,都是做为此篇的基础铺垫,若是对此篇不懂的能够先看我上一篇的介绍认识RabbitMQ交换机模型,或者联系评论,分享《RabbitMQ实战指南》电子书给你们,里面虽然有些许错误,但整体仍是很棒的一本书!html

  本文主要介绍RabbitMQ的消息是怎么产生和经过它是怎么接收消息的(RabbitMQ如何运转)、Connection和Channel概念、RabbitMQ的简单部署、Java代码简单实践三个部分java

  

 


 

 

1、RabbitMQ的运转流程

  一、生产者流程  

      1) 生产者链接到RabbitMQ Broker,创建Connection,开启信道Channel(Connection与Channel概念下面会介绍)node

      2) 生产者声明一个交换器,设置相关属性。shell

      3) 生产者声明一个队列并设置相关属性json

      4) 生产者经过路由键将交换器和队列绑定起来网络

      5) 生产者发送消息到RabbitMQ Broker,包括路由键、交换器信息等app

      6) 相应的交换器根据路由键查找匹配的队列async

      7) 若是找到则消息存入相应队列中maven

      8) 若是没找到则根据配置的属性丢弃或者回退给生产者ide

      9) 关闭信道

      10)关闭链接

  二、消费者流程

      1) 消费者链接到RabbitMQ Broker,创建Connection,开启Channel

      2) 消费者向RabbitMQ Broker请求消费相应队列中消息,可能会设置相应的回调函数。

      3) 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息。

      4) 消费者确认ack接收到的消息。

      5) RabbitMQ从队列中删除相应已经被确认的消息。

      6) 关闭信道。

      7) 关闭链接

 

    其实,最主要最很差理解的也就是Connection与Channel这两个概念,若是只是光看这些流程会至关不理解,为何先创建Connection再创建Channel,这两个又是什么区别?因此再往下就是介绍Connection与Channel了!

 

2、Connection与Channel概念

  

  一、 Connection:实际就是一条TCP链接,TCP一旦创建起来,客户端紧接着能够建立AMQP信道。

  二、 Channel:每一个Channel都有惟一的ID,都是创建在Connection上的虚拟链接,RabbitMQ处理每条AMQP指令都是经过信道完成的

       

                                (结合两张图,更好理解Connection与Channel两个概念)

  

  三、单TCP复用链接与多信道的优点

      1)为何TCP链接只有一条,而每一个生产者都会建立一条惟一的信道呢?想象下,实际状况,会有不少的生产者生产消息,多个消费者消费消息,那么就不得不建立多个线程,创建多个TCP链接。多个TCP链接的创建必然会对操做系统性能消耗较高,也不方便管理。从而选择一种相似于NIO(非阻塞I/O, Non-blocking I/O)技术是颇有必要的,多信道的在TCP基础上的创建就是这么实现的。

      2)每一个线程都有本身的一个信道,复用了Connection的TCP链接,信道之间相互独立,相互保持神秘,节约TCP链接资源,固然自己信道的流量很大的话,也能够建立多个适当的Connection的TCP链接,须要根据具体业务状况制定

   

3、RabbitMQ部署

  主要以Linux CentOS 7举例部署,

一、准备Erlang环境

安装运行RabbitMQ以前,先安装Erlang环境,由于RabbitMQ是relang语言写的。下载http://www.erlang.org/downloads获得otp_src_21.2.tar.gz包

 1)解压到/opt/erlang目录下,./configure配置生成make make install

[root@hidden]# tar xvf otp_src_21.2.tar.gz 
[root@hidden]# cd otp_src_21.2 
[root@hidden otp_src_21.2]#./configure --prefix=/opt/er1ang

 若是安装过程出现"No curses library functions found",则须要安装ncurses

[root@hidden otp_src_21.2]# yum install ncurses-devel

 2)编译安装make & make install

[root@hidden otp_src_21.2]# make & make install

 3)修改/etc/profile文件,增长以下语句

ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME

 4)执行/etc/profile配置文件

[root@hidden otp_src_21.2]# source /etc/profile

 5)测试是否安装成功

[root@hidden otp_src_21.2]#erl

若是出现以下语句,则说明安装成功

Erlang/OTP 21 [erts-10.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]

Eshell V10.2  (abort with ^G)
1>     

 二、安装RabbiMQ

 RabbitMQ安装比Erlang安装简单不少,下载generic压缩包:rabbitmq-server-generic-unix-3.7.11.tar.xz

 1)解压压缩到到Erlang同目录/opt下

[root@hidden]# tar zvxf rabbitmq-server-generic-unix-3.7.11.tar.xz
[root@hidden]# cd /opt
[root@hidden]# mv rabbitmq-server-generic-unix-3.7.11.tar.xz rabbitmq

 2)修改/etc/profile文件,增长以下语句

export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq

 3)执行profile文件,使其生效

[root@hidden otp_src_21.2]# source /etc/profile

 4)修改运行为守护进程模式

[root@hidden otp_src_21.2]# rabbitmq-server -detached

 5)测试是否安装成功,出现Status of node rabbit@.........以下语句则说明安装成功

[root@hidden rabbitmq]# rabbitmqctl status

Status of node rabbit@iz2ze49fh77zgs1rzxo0l7z ...
[{pid,11462},
{running_applications,
[{rabbit,"RabbitMQ","3.7.11"},
{mnesia,"MNESIA CXC 138 12","4.15.5"},
{os_mon,"CPO CXC 138 46","2.4.7"},
{sysmon_handler,"Rate-limiting system_monitor event handler","1.1.0"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.11"},
{ranch,"Socket acceptor pool for TCP protocols.","1.7.1"},
{ssl,"Erlang/OTP SSL application","9.1"},
{public_key,"Public key infrastructure","1.6.4"},
{asn1,"The Erlang ASN1 compiler version 5.0.8","5.0.8"},
{inets,"INETS CXC 138 49","7.0.3"},
{recon,"Diagnostic tools for production use","2.3.6"},
{xmerl,"XML parser","1.3.18"},
{jsx,"a streaming, evented json parsing toolkit","2.9.0"},

..........

 三、新增用户与受权

RabbitMQ默认状况下用户和密码都为“guest”,但只能经过默认的本地网络localhost访问,网络访问受限,因此须要再单独新增用户授予权限

 1)新增root用户

 新增用户名为root,密码为root

[root@hidden rabbitmq]# rabbitmqctl add_user root root

 2)受权root用户到默认vhost可配置、可读、可写权限

[root@hidden rabbitmq]# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

 3)设置root为管理员角色

[root@hidden rabbitmq]# rabbitmqctl set_user_tags root administrator

4、Java代码实践

首先maven下载jar包:

<!-- rabbitmq-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.6.0</version>
    </dependency>

一、生产者类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author jian
 * @date 2019/2/14
 * @description RabbitMQ测试: 消息服务端
 *
 */
public class RabbitProducer {
    // 路由键
    private static final String ROUTING_KEY = "routingkey_demo";
    // 交换机名称
    private static final String EXCHANGE_NAME = "exchange_demo";
    // 队列名称
    private static final String QUEUE_NAME = "queue_demo";
    // RabbitMQ地址
    private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
    // RabbitMQ默认端口5672
    private static final int PORT = 5672;

    public static void publicMeesage () {
        // 1)经过链接工厂创建复用TCP链接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        try {
            Connection connection = connectionFactory.newConnection();
            // 2)创建多信道
            Channel channel = connection.createChannel();
            // 3)声明交换器:建立一个direct、持久化、非自动删除的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 4)声明队列:建立一个持久化、非排他的、非自动删除的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 5)将交换器与队列经过路由键绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            // 6) 发送持久化消息
            String message = "hello world!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("producer published message: " + message);
            // 7)关闭信道
            channel.close();
            // 8)关闭链接
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

 

二、消费者类 

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author jian
 * @date 2019/2/14
 * @description RabbitMQ测试:消费者消费消息
 */
public class RabbitConsumer {

    // 队列名称
    private static final String QUEUE_NAME = "queue_demo";
    // RabbitMQ地址
    private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
    // RabbitMQ默认端口5672
    private static final int PORT = 5672;

    public static void recevieMessage() {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        // 1)经过链接工厂创建复用TCP链接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        try {
            // 2)创建链接:此处与生产者创建创建链接是不一样的
            Connection connection = connectionFactory.newConnection(addresses);
            // 3) 建立channel信道
            Channel channel = connection.createChannel();
            // 设置客户端最多接收未被ack消息的个数
            channel.basicQos(64);
            // 4)消费者向RabbitMQ Broker请求消费相应队列中消息: 有消息就会执行回调函数handleDelivery
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumer received message: " + new String(body, "UTF-8"));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 5)消费者确认ack接收 到的消息:自动回复队列应答
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
            // 等待回调函数执行完毕
            TimeUnit.SECONDS.sleep(5);
            // 6) 关闭信道
            channel.close();
            // 7) 关闭链接
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、测试类

public class RabbitMQTest {

    public static void main(String[] args) {
        RabbitProducer.publicMeesage();
        RabbitConsumer.recevieMessage();
    }
}

四、测试结果

producer published message: hello world!
consumer received message: hello world!
相关文章
相关标签/搜索