RabbitMQ-工做原理

使用场景

在咱们秒杀抢购商品的时候,系统会提醒咱们稍等排队中,而不是像几年前同样页面卡死或报错给用户。css

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断忽然涌入大批量的查询新增把数据库给搞宕机,因此RabbitMQ本质上起到的做用就是削峰填谷,为业务保驾护航。html

为何选择RabbitMQ

如今的市面上有不少MQ能够选择,好比ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为何要选择RabbitMQ?java

  1. 除了Qpid,RabbitMQ是惟一一个实现了AMQP标准的消息服务器;
  2. 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  3. 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  4. 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
  5. 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

工做机制

生产者、消费者和代理算法

在了解消息通信以前首先要了解3个概念:生产者、消费者和代理。数据库

生产者:消息的建立者,负责建立和推送数据到消息服务器;安全

消费者:消息的接收方,用于处理数据和确认消息;服务器

代理:就是RabbitMQ自己,用于扮演“快递”的角色,自己不生产消息,只是扮演“快递”的角色。网络

消息发送原理并发

首先你必须链接到Rabbit才能发布和消费消息,那怎么链接和发送消息的呢?app

你的应用程序和Rabbit Server之间会建立一个TCP链接,一旦TCP打开,并经过了认证,认证就是你试图链接Rabbit以前发送的Rabbit服务器链接信息和用户名和密码,有点像程序链接数据库,使用Java有两种链接认证的方式,后面代码会详细介绍,一旦认证经过你的应用程序和Rabbit就建立了一条AMQP信道(Channel)。

信道是建立在“真实”TCP上的虚拟链接,AMQP命令都是经过信道发送出去的,每一个信道都会有一个惟一的ID,不管是发布消息,订阅队列或者介绍消息都是经过信道完成的。

为何不经过TCP直接发送命令?

对于操做系统来讲建立和销毁TCP会话是很是昂贵的开销,假设高峰期每秒有成千上万条链接,每一个链接都要建立一条TCP会话,这就形成了TCP链接的巨大浪费,并且操做系统每秒能建立的TCP也是有限的,所以很快就会遇到系统瓶颈。

若是咱们每一个请求都使用一条TCP链接,既知足了性能的须要,又能确保每一个链接的私密性,这就是引入信道概念的缘由。

你必须知道的Rabbit

想要真正的了解Rabbit有些名词是你必须知道的。

包括:ConnectionFactory(链接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

ConnectionFactory(链接管理器):应用程序与Rabbit之间创建链接的管理器,程序代码中使用;

Channel(信道):消息推送使用的通道;

Exchange(交换器):用于接受、分配消息;

Queue(队列):用于存储生产者的消息;

RoutingKey(路由键):用于把生成者的数据分配到交换器上;

BindingKey(绑定键):用于把交换器的消息绑定到队列上;

看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥做用的,请看下图:

关于更多交换器的信息,咱们在后面再讲。

消息持久化

Rabbit队列和交换器有一个不可告人的秘密,就是默认状况下重启服务器会致使消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

当你把消息发送到Rabbit服务器的时候,你须要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须知足3个条件:

  1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
  2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
  3. 消息已经到达持久化交换器上;
  4. 消息已经到达持久化的队列;

持久化工做原理

Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费以后,Rabbit会把这条消息标识为等待垃圾回收。

持久化的缺点

消息持久化的优势显而易见,但缺点也很明显,那就是性能,由于要写入硬盘要比写入内存性能较低不少,从而下降了服务器的吞吐量,尽管使用SSD硬盘可使事情获得缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

因此使用者要根据本身的状况,选择适合本身的方式。

虚拟主机

每一个Rabbit都能建立不少vhost,咱们称之为虚拟主机,每一个虚拟主机其实都是mini版的RabbitMQ,拥有本身的队列,交换器和绑定,拥有本身的权限机制。

vhost特性

  1. RabbitMQ默认的vhost是“/”开箱即用;

  2. 多个vhost是隔离的,多个vhost没法通信,而且不用担忧命名冲突(队列和交换器和绑定),实现了多层分离;

  3. 建立用户的时候必须指定vhost;

vhost操做

能够经过rabbitmqctl工具命令建立:

rabbitmqctl add_vhost[vhost_name]

删除vhost:

rabbitmqctl delete_vhost[vhost_name]

查看全部的vhost:

rabbitmqctl list_vhosts

vhosts(broker)
一个RabbitMQ的实体上能够有多个vhosts,用户与权限设置就是依附于vhosts。
在rabbitmq server上能够建立多个虚拟的message broker,又叫作virtual hosts (vhosts)。每个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost至关于物理的server,能够为不一样app提供边界隔离,使得应用安全的运行在不一样的vhost实例上,相互之间不会干扰。producer和consumer链接rabbit server须要指定一个vhost。

connection 与 channel(链接与信道)
connection是指物理的链接,一个client与一个server之间有一个链接;一个链接上能够创建多个channel,能够理解为逻辑上的链接。通常应用的状况下,有一个channel就够用了,不须要建立更多的channel。

exchange 与  routingkey(交换机与路由键)
Exchange相似于数据通讯网络中的交换机,提供消息路由策略。rabbitmq中,producer不是经过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange能够和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue同样,Exchange也可设置为持久化,临时或者自动删除。

环境搭建

前文咱们已经介绍了Ubuntu搭建RabbitMQ的步骤:RabbitMQ在Ubuntu上的环境搭建

若是你是在Windows10上去安装那就更简单了,先放下载地址:

Erlang/Rabbit Server百度网盘连接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密码:wct9

固然也可去Erlang和Rabbit官网去下,就是速度比较慢。个人百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下载最新的Erlang,在Windows10上打开扩展插件有问题,打不开。

  1. 安装Erlang;

  2. 安装Rabbit Server;

  3. 进入安装目录\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”启动网页管理插件;

  4. 重启Rabbit服务;

使用:http://localhost:15672进行测试,默认的登录帐号为:guest,密码为:guest

重复安装Rabbit Server的坑

若是不是第一次在Windows上安装Rabbit Server必定要把Rabbit和Erlang卸载干净以后,找到注册表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 删除其下的全部项。

否则会出现Rabbit安装以后启动不了的状况,理论上卸载的顺序也是先Rabbit在Erlang。

代码实现

java版实现,使用maven项目,建立能够查看:MyEclipse2017破解设置与maven项目搭建

项目建立成功以后,添加Rabbit Client jar包,只须要在pom.xml里面配置,以下信息:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version> </dependency>

java实现代码分为两个类,第一个是建立Rabbit链接,第二是应用类使用最简单的方式发布和消费消息。

Rabbit的链接,两种方式:

方式一:

public static Connection GetRabbitConnection() { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(Config.UserName); factory.setPassword(Config.Password); factory.setVirtualHost(Config.VHost); factory.setHost(Config.Host); factory.setPort(Config.Port); Connection conn = null; try { conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }

方式二:

public static Connection GetRabbitConnection2() { ConnectionFactory factory = new ConnectionFactory(); // 链接格式:amqp://userName:password@hostName:portNumber/virtualHost String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port, Config.VHost); Connection conn = null; try { factory.setUri(uri); factory.setVirtualHost(Config.VHost); conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }

第二部分:应用类,使用最简单的方式发布和消费消息

public static void main(String[] args) { Publisher(); // 推送消息 Consumer(); // 消费消息 } /** * 推送消息 */ public static void Publisher() { // 建立一个链接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 建立通道 Channel channel = conn.createChannel(); // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开链接时是否删除队列;参数五:消息其余参数】 channel.queueDeclare(Config.QueueName, false, false, false, null); String content = String.format("当前时间:%s", new Date().getTime()); // 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其余属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】 channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8")); System.out.println("已发送消息:" + content); // 关闭链接 channel.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * 消费消息 */ public static void Consumer() { // 建立一个链接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 建立通道 Channel channel = conn.createChannel(); // 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开链接时是否删除队列;参数五:消息其余参数】 channel.queueDeclare(Config.QueueName, false, false, false, null); // 建立订阅器,并接受消息 channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); // 队列名称 String contentType = properties.getContentType(); // 内容类型 String content = new String(body, "utf-8"); // 消息正文 System.out.println("消息正文:" + content); channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】 } }); } catch (Exception e) { e.printStackTrace(); } } }

代码里面已经写了很详细的注释,在这里也不过多的介绍了。

执行效果,如图:

相关文章
相关标签/搜索