消息中间件-RabbitMq(搭建&消息分发)

消息中间件-RabbitMq(搭建&消息分发)

RabbitMq】是一个【AMQP】协议的实现。服务端使用的是Erlang语言进行编写,那也就是说,咱们要运行它,就要安装相关Erlang环境。前面说了AMQP最初是为了解决金融行业的可用性问题,因此Rabbit在高可用方面表现不俗,而且在我看来他是这几种中间件中最容易上手的一个。并且它在并发方面表现十分出色,能够实现大概10w的吞吐量。他的特色是:【可靠性、消息集群、高可用、插件机制(可让它支持别的协议)、支持多语言客户端、管理页面 so on本篇主要聊聊如何安装、使用、以及关于他的一些名词方面的阐述。run。。html

安装运行

  • 个人环境是CentOS7
  • http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq须要安装erlang对应的版本,前面是Rabbit的版本,后面是Erlang的对它支持的版本。这里先后要对应下载,版本必须符合他的要求,我这里使用的就是第一个。
  • https://github.com/rabbitmq/erlang-rpm/releases 中复制对应的版本erlang下载地址
  • https://github.com/rabbitmq/rabbitmq-server/tags 中复制对应的版本rabbitmq的下载地址

  • 下载Erlang
    • wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安装Erlang
    • sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安装socat
    • sudo yum install -y socat
  • 下载RabbitMQ
    •  wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
  • 安装RabbitMQ
    • 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】

到目前为止咱们的准备工做完毕,如下是一些启动和关闭命令。git

中止服务】:sudo systemctl stop rabbitmq-server 【查询状态】:sudo systemctl status rabbitmq-server 【启动】:sudo systemctl start rabbitmq-server 【设置开启自启】:sudo systemctl enable rabbitmq-server github

使用启动命令启动后,咱们查询状态发现状态为 dead,这是由于咱们要启动他的插件 使用【rabbitmq-plugins list】能够查询全部他支持的插件,咱们这里须要启动服务器

【rabbitmq-plugins  enable rabbitmq_management】并发

 执行完成后使用【 cat /etc/rabbitmq/enabled_plugins】就能够知道是否启动插件成功,而后再次启动发现启动状态就为running,使用【netstat -nplt | grep 15672 】发现他的专用端口已经开启,至此,安装启动完毕。这个时候就能够对它进行访问了(你的ip:15672),出现下面的图,就证实搭建成功。这里注意开放一下端口,不然别的机器没法访问:tcp

  • sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent 

 然而咱们用他本身的gust是没法login in 进去的,由于这个支持在搭建的服务器自己上访问,那咱们就要建立本身的用户,而且赋予相应的权限。ide

  • 【添加一个admin用户】:rabbitmqctl add_user admin admin 
  • 【分配操做权限】:rabbitmqctl set_user_tags admin administrator
  • 【分配资源权限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*

使用admin进行登陆,至此,能够rabbitmq能够正常使用ui

使用

添加相关依赖spa

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.5.1</version>
 </dependency>

生产一个消息插件

public class Producer {

    public static void main(String[] args) {
        // 一、建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 二、设置链接属性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、从链接工厂获取链接
            connection = factory.newConnection();

            // 四、从连接中建立通道
            channel = connection.createChannel();

            /**
             * 五、声明(建立)队列
             * 若是队列不存在,才会建立
             * RabbitMQ 不容许声明两个队列名相同,属性不一样的队列,不然会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,便是否为私有的,若是为true,会对当前队列加锁,其它通道不能访问,而且在链接关闭时会自动删除,不受持久化和自动删除的属性控制
             * @param autoDelete 是否自动删除,当最后一个消费者断开链接以后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中全部消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 六、发送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已发送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 七、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 八、关闭链接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

 

生产后你去管理页面查询,会发现一个消息还未读取。

消费一个消息(消费后再次查询,发现ready中没有东西了)

public class Consumer {

    public static void main(String[] args) {
        // 一、建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 二、设置链接属性
        factory.setHost("你的ip");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、从链接工厂获取链接
            connection = factory.newConnection("消费者");

            // 四、从连接中建立通道
            channel = connection.createChannel();

            channel.queueDeclare("queue1", false, false, false, null);

            // 六、定义收到消息后的回调
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 七、监听队列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 八、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 九、关闭链接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

至此,简单使用结束。

使用客户端:这里首先建立两个队列,而后在交换机上模拟发送消息,以topic类型的交换机为例,他会进行routing key的匹配,在发送消息的时候,把你的routing key 携带,便可匹配。

 

 

 

 一个topic类型的交换机的例子

/**
 * Topic--生产者
 *
 * 生产者将消息发送到topic类型的交换器上,和routing的用法相似,都是经过routingKey路由,但topic类型交换器的routingKey支持通配符
 */
public class Producer {

    public static void main(String[] args) {
        // 一、建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 二、设置链接属性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 三、从链接工厂获取链接
            connection = factory.newConnection("生产者");

            // 四、从连接中建立通道
            channel = connection.createChannel();

            // 路由关系以下:com.# --> queue-1     *.order.* ---> queue-2
            // 消息内容
            String message = "Hello A";
            // 发送消息到topic_test交换器上
            channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已发送!");

            // 消息内容
            message = "Hello B";
            // 发送消息到topic_test交换器上
            channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已发送!");

            // 消息内容
            message = "Hello C";
            // 发送消息到topic_test交换器上
            channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
            System.out.println("消息 " + message + " 已发送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 七、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 八、关闭链接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
/**
 * 路由--消费者
 *
 * 消费者经过一个临时队列和交换器绑定,接收发送到交换器上的消息
 */
public class Consumer {

    private static Runnable receive = () -> {
        // 一、建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 二、设置链接属性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;
        final String queueName = Thread.currentThread().getName();

        try {
            // 三、从链接工厂获取链接
            connection = factory.newConnection("消费者");

            // 四、从连接中建立通道
            channel = connection.createChannel();
            // 定义消息接收回调对象
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 监听队列
            channel.basicConsume(queueName, true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println(queueName + " 开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 八、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 九、关闭链接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(receive, "queue1").start();
        new Thread(receive, "queue-2").start();
    }

}
View Code

 

Rabbit名词介绍

Blocker:一个rabbit服务器就是一个Blocker

虚拟主机(virtual host一个Blocker中能够有多个虚拟机,每一个虚拟机相似于一个工做空间,每一个虚拟主机中的消息和其余虚拟主机的消息不相互影响

connection:消费者和rabbit中间的链接,有了这个链接,双方才能通讯。

RoutingKey:消息被发给交换机的时候,会携带它,这个是用来指定消息的路由规则(能够为空)

channel(信道):是在connection上创建的管道,一个connection上能够创建多个channel,消息经过他们进行传递。

BindingKey:Exchange和Queue绑定的关系,Exchange接收到的消息会带有RoutingKey这个字段。

交换机(exchanger):当rabbit接收到消息后,交换机对这些消息进行转换,他的类型决定哪一个队列中应该拥有这些消息,

交换机类型:

  • 【direct】:当发送消息的时候,咱们会在消息体上携带一个路由键【routekey】,若是消息体上你的路由键和队列匹配则发送给对应的队列。
  • 【fanout 】:发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。
  • 【headers】:根据发送的消息内容中的【headers】属性进行匹配,当消息发送到RabbitMQ时会取到该消息的【headers】与Exchange绑定时指定的键值对进行匹配,若是匹配到,则对应队列能够接受到消息。
  • 【topic】:将路由键和某模式进行匹配。此时队列须要绑定要一个模式上。符号“#”会匹配一个或多个词,好比【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要队列能够匹配到,就能够接受消息

队列(queue):rabbit接收到的信息存储在这里,消费者也是从这里获取的消息。

binder: 队列和交换机之间的绑定

AMQP(advanced message queuing protocol):

他是应用层协议的一个开放标准,为面向消息的中间件协议。他分为三层:

【底层协议层】:主要传输二进制数据流,

【中间层】:将客户端的命令转发给服务器,而后将服务器的回复转给客户端。【将最高层的用户层传递的信息转化为二进制,传递给底层。把底层的信息转化为客户端能够知道的语言。】

【最高层】:提供用户调用的命令。

流转流程

生产者:创建链接->开启通道->发送消息->关闭资源

消费者:创建链接->开启通道->接受消息->发送确认消息(告诉rabbit,rabbit修改消息状态为已经读 and so on)->释放资源

相关文章
相关标签/搜索