【RabbitMq】是一个【AMQP】协议的实现。服务端使用的是Erlang语言进行编写,那也就是说,咱们要运行它,就要安装相关Erlang环境。前面说了AMQP最初是为了解决金融行业的可用性问题,因此Rabbit在高可用方面表现不俗,而且在我看来他是这几种中间件中最容易上手的一个。并且它在并发方面表现十分出色,能够实现大概10w的吞吐量。他的特色是:【可靠性、消息集群、高可用、插件机制(可让它支持别的协议)、支持多语言客户端、管理页面 so on】本篇主要聊聊如何安装、使用、以及关于他的一些名词方面的阐述。run。。html
- 个人环境是CentOS7
- http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq须要安装erlang对应的版本,前面是Rabbit的版本,后面是Erlang的对它支持的版本。这里先后要对应下载,版本必须符合他的要求,我这里使用的就是第一个。
- https://github.com/rabbitmq/erlang-rpm/releases 中复制对应的版本erlang下载地址
- https://github.com/rabbitmq/rabbitmq-server/tags 中复制对应的版本rabbitmq的下载地址
- 下载Erlang
- 【wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安装Erlang
- 【sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安装socat
- 【sudo yum install -y socat】
- 下载RabbitMQ
- 【wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
- 安装RabbitMQ
- 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
到目前为止咱们的准备工做完毕,如下是一些启动和关闭命令。git
【中止服务】:sudo systemctl stop rabbitmq-server 【查询状态】:sudo systemctl status rabbitmq-server 【启动】:sudo systemctl start rabbitmq-server 【设置开启自启】:sudo systemctl enable rabbitmq-server github
使用启动命令启动后,咱们查询状态发现状态为 dead,这是由于咱们要启动他的插件 使用【rabbitmq-plugins list】能够查询全部他支持的插件,咱们这里须要启动服务器
【rabbitmq-plugins enable rabbitmq_management】并发
执行完成后使用【 cat /etc/rabbitmq/enabled_plugins】就能够知道是否启动插件成功,而后再次启动发现启动状态就为running,使用【netstat -nplt | grep 15672 】发现他的专用端口已经开启,至此,安装启动完毕。这个时候就能够对它进行访问了(你的ip:15672),出现下面的图,就证实搭建成功。这里注意开放一下端口,不然别的机器没法访问:tcp
- sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
然而咱们用他本身的gust是没法login in 进去的,由于这个支持在搭建的服务器自己上访问,那咱们就要建立本身的用户,而且赋予相应的权限。ide
- 【添加一个admin用户】:rabbitmqctl add_user admin admin
- 【分配操做权限】:rabbitmqctl set_user_tags admin administrator
- 【分配资源权限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*
使用admin进行登陆,至此,能够rabbitmq能够正常使用ui
添加相关依赖spa
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency>生产一个消息插件
View Codepublic class Producer { public static void main(String[] args) { // 一、建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 二、设置链接属性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、从链接工厂获取链接 connection = factory.newConnection(); // 四、从连接中建立通道 channel = connection.createChannel(); /** * 五、声明(建立)队列 * 若是队列不存在,才会建立 * RabbitMQ 不容许声明两个队列名相同,属性不一样的队列,不然会报错 * * queueDeclare参数说明: * @param queue 队列名称 * @param durable 队列是否持久化 * @param exclusive 是否排他,便是否为私有的,若是为true,会对当前队列加锁,其它通道不能访问,而且在链接关闭时会自动删除,不受持久化和自动删除的属性控制 * @param autoDelete 是否自动删除,当最后一个消费者断开链接以后是否自动删除 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中全部消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息内容 String message = "Hello World!"; // 六、发送消息 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息已发送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 七、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 八、关闭链接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
生产后你去管理页面查询,会发现一个消息还未读取。
消费一个消息(消费后再次查询,发现ready中没有东西了)
View Codepublic class Consumer { public static void main(String[] args) { // 一、建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 二、设置链接属性 factory.setHost("你的ip"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、从链接工厂获取链接 connection = factory.newConnection("消费者"); // 四、从连接中建立通道 channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); // 六、定义收到消息后的回调 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 七、监听队列 channel.basicConsume("queue1", true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 八、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 九、关闭链接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }至此,简单使用结束。
使用客户端:这里首先建立两个队列,而后在交换机上模拟发送消息,以topic类型的交换机为例,他会进行routing key的匹配,在发送消息的时候,把你的routing key 携带,便可匹配。
一个topic类型的交换机的例子
View Code/** * Topic--生产者 * * 生产者将消息发送到topic类型的交换器上,和routing的用法相似,都是经过routingKey路由,但topic类型交换器的routingKey支持通配符 */ public class Producer { public static void main(String[] args) { // 一、建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 二、设置链接属性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、从链接工厂获取链接 connection = factory.newConnection("生产者"); // 四、从连接中建立通道 channel = connection.createChannel(); // 路由关系以下:com.# --> queue-1 *.order.* ---> queue-2 // 消息内容 String message = "Hello A"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); // 消息内容 message = "Hello B"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); // 消息内容 message = "Hello C"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 七、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 八、关闭链接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } /** * 路由--消费者 * * 消费者经过一个临时队列和交换器绑定,接收发送到交换器上的消息 */ public class Consumer { private static Runnable receive = () -> { // 一、建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 二、设置链接属性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; final String queueName = Thread.currentThread().getName(); try { // 三、从链接工厂获取链接 connection = factory.newConnection("消费者"); // 四、从连接中建立通道 channel = connection.createChannel(); // 定义消息接收回调对象 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 监听队列 channel.basicConsume(queueName, true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println(queueName + " 开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 八、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 九、关闭链接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }; public static void main(String[] args) { new Thread(receive, "queue1").start(); new Thread(receive, "queue-2").start(); } }
Rabbit名词介绍
Blocker:一个rabbit服务器就是一个Blocker
虚拟主机(virtual host):一个Blocker中能够有多个虚拟机,每一个虚拟机相似于一个工做空间,每一个虚拟主机中的消息和其余虚拟主机的消息不相互影响
connection:消费者和rabbit中间的链接,有了这个链接,双方才能通讯。
RoutingKey:消息被发给交换机的时候,会携带它,这个是用来指定消息的路由规则(能够为空)
channel(信道):是在connection上创建的管道,一个connection上能够创建多个channel,消息经过他们进行传递。
BindingKey:Exchange和Queue绑定的关系,Exchange接收到的消息会带有RoutingKey这个字段。
交换机(exchanger):当rabbit接收到消息后,交换机对这些消息进行转换,他的类型决定哪一个队列中应该拥有这些消息,
交换机类型:
- 【direct】:当发送消息的时候,咱们会在消息体上携带一个路由键【routekey】,若是消息体上你的路由键和队列匹配则发送给对应的队列。
- 【fanout 】:发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。
- 【headers】:根据发送的消息内容中的【headers】属性进行匹配,当消息发送到RabbitMQ时会取到该消息的【headers】与Exchange绑定时指定的键值对进行匹配,若是匹配到,则对应队列能够接受到消息。
- 【topic】:将路由键和某模式进行匹配。此时队列须要绑定要一个模式上。符号“#”会匹配一个或多个词,好比【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要队列能够匹配到,就能够接受消息
队列(queue):rabbit接收到的信息存储在这里,消费者也是从这里获取的消息。
binder: 队列和交换机之间的绑定
AMQP(advanced message queuing protocol):
他是应用层协议的一个开放标准,为面向消息的中间件协议。他分为三层:
【底层协议层】:主要传输二进制数据流,
【中间层】:将客户端的命令转发给服务器,而后将服务器的回复转给客户端。【将最高层的用户层传递的信息转化为二进制,传递给底层。把底层的信息转化为客户端能够知道的语言。】
【最高层】:提供用户调用的命令。
流转流程
生产者:创建链接->开启通道->发送消息->关闭资源
消费者:创建链接->开启通道->接受消息->发送确认消息(告诉rabbit,rabbit修改消息状态为已经读 and so on)->释放资源