RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构以下:html
RabbitMQ做为一个消息代理,主要和消息打交道,负责接收并转发消息。RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。算法
下面咱们就来学习下RabbitMQ。windows
本文主要基于Windows下使用Vs Code 基于.net core进行demo演示。开始以前咱们须要准备好如下环境。缓存
RabbitMQ Comman Prompt
,以管理员身份运行。依次执行如下命令启动RabbitMQ服务服务器
rabbitmq-service install rabbitmq-service enable rabbitmq-service start
执行rabbitmqlctl status
检查RabbitMQ状态网络
安装管理平台插件
执行rabbitmq-plugins enable rabbitmq_management
便可成功安装,使用默认帐号密码(guest/guest)登陆http://localhost:15672/便可。架构
在开始以前咱们先来了解下消息模型:
消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,队列再将消息发送到监听的消费者。异步
下面咱们咱们经过demo来了解RabbitMQ的基本用法。分布式
建立RabbitMQ文件夹,打开命令提示符,分别建立两个控制台项目Send、Receive。性能
dotnet new console --name Send //建立发送端控制台应用 cd Send //进入Send目录 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢复包 dotnet new console --name Receive //建立接收端控制台应用 cd Receive //进入Receive目录 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢复包
咱们先来添加消息发送端逻辑:
//Send.cs public static void Main(string[] args) { //1.1.实例化链接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //2. 创建链接 using (var connection = factory.CreateConnection()) { //3. 建立信道 using (var channel = connection.CreateModel()) { //4. 申明队列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5. 构建byte消息数据包 string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message); //6. 发送数据包 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } }
再来完善消息接收端逻辑:
//Receive.cs 省略部分代码 public static void Main() { //1.实例化链接工厂 var factory = new ConnectionFactory() { HostName = "localhost" }; //2. 创建链接 using (var connection = factory.CreateConnection()) { //3. 建立信道 using (var channel = connection.CreateModel()) { //4. 申明队列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5. 构造消费者实例 var consumer = new EventingBasicConsumer(channel); //6. 绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模拟耗时 Console.WriteLine (" [x] Done"); }; //7. 启动消费者 channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
先运行消息接收端,再运行消息发送端,结果以下图。
从上面的代码中能够看出,发送端和消费端的代码前4步都是同样的。主要的区别在于发送端调用channel.BasicPublish
方法发送消息;而接收端须要实例化一个EventingBasicConsumer
实例来进行消息处理逻辑。另一点须要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。
使用工做队列的好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了。咱们先启动两个接收端,等待消息接收,再启动一个发送端进行消息发送。
咱们增长运行一个消费端后的运行结果:
从图中可知,咱们循环发送4条信息,两个消息接收端按顺序被循环分配。
默认状况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每一个消费者将得到相同数量的消息。这种分发消息的方式叫作循环(round-robin)。
按照咱们上面的demo,一旦RabbitMQ将消息发送到消费端,消息就会当即从内存中移出,不管消费端是否处理完成。在这种状况下,消息就会丢失。
为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息而且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就能够删除掉这条已经处理的消息任务。但若是消费端挂掉了(好比,通道关闭、链接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会从新将消息入队,若是有另一个消费端在线,就会快速的从新发送到另一个消费端。
RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会从新分发消息。
微调下Receive中的代码逻辑:
//5. 构造消费者实例 var consumer = new EventingBasicConsumer(channel); //6. 绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模拟耗时 Console.WriteLine(" [x] Done"); // 7. 发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8. 启动消费者 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,无论消息是否正确处理完毕 //autoAck:false;关闭自动消息确认,经过调用BasicAck方法手动进行消息确认 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
主要改动的是将 autoAck:true
修改成autoAck:fasle
,以及在消息处理完毕后手动调用BasicAck
方法进行手动消息确认。
从图中可知,消息发送端连续发送4条消息,其中消费端1先被分配处理第一条消息,消费端2被循环分配第二条消息,第三条消息因为没有空闲消费者仍然在队列中。
在消费端2未处理完第一条消息以前,手动中断(ctrl+c)。咱们能够发现RabbitMQ在下一次分发时,会优先将被中断的消息分发给消费端1处理。
消息确认确保了即便消费端异常,消息也不会丢失可以被从新分发处理。可是若是RabbitMQ服务端异常,消息依然会丢失。除非咱们指定durable:true
,不然当RabbitMQ退出或奔溃时,消息将依然会丢失。经过指定durable:true
,并指定Persistent=true
,来告知RabbitMQ将消息持久化。
//send.cs //4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5. 构建byte消息数据包 string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message); //6. 发送数据包(指定basicProperties) channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
将消息标记为持久性不能彻底保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,可是当RabbitMQ接受消息而且尚未保存时,仍然有一个很短的时间窗口。RabbitMQ 可能只是将消息保存到了缓存中,并无将其写入到磁盘上。持久化是不可以必定保证的,可是对于一个简单任务队列来讲已经足够。若是须要确保消息队列的持久化,可使用publisher confirms.
RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙状况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理通常任务处于空置状态,而只是它们分配的任务数量同样。
但咱们能够经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1
来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息,也就确保了当消费端处于忙碌状态时,再也不分配任务。
//Receive.cs //4. 申明队列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息,也就确保了当消费端处于忙碌状态时 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
这时你须要注意的是若是全部的消费端都处于忙碌状态,你的队列可能会被塞满。你须要注意这一点,要么添加更多的消费端,要么采起其余策略。
细心的你也许发现上面的demo,生产者和消费者直接是经过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者建立消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。
那消费者如何才能发送消息到多个消息队列呢?
RabbitMQ提供了Exchange,它相似于路由器的功能,它用于对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另外一方面将消息推送到队列。但exchange必须知道如何处理接收到的消息,是将其附加到特定队列仍是附加到多个队列,仍是直接忽略。而这些规则由exchange type定义,exchange的原理以下图所示。
常见的exchange type 有如下几种:
下面咱们就来一一这介绍它们的用法。
本着先易后难的思想,咱们先来了解下fanout的广播路由机制。fanout的路由机制以下图,即发送到 fanout 类型exchange的消息都会分发到全部绑定该exchange的队列上去。
生产者示例代码:
// 生成随机队列名称 var queueName = channel.QueueDeclare().QueueName; //使用fanout exchange type,指定exchange名称 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //发布到指定exchange,fanout类型无需指定routingKey channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);
消费者示例代码:
//申明fanout类型exchange channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout"); //申明随机队列名称 var queuename = channel.QueueDeclare ().QueueName; //绑定队列到指定fanout类型exchange,无需指定路由键 channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");
4.2. direct
direct相对于fanout就属于彻底匹配、单播的模式,路由机制以下图,即队列名称和消息发送时指定的路由彻底匹配时,消息才会发送到指定队列上。
生产者示例代码:
// 生成随机队列名称 var queueName = channel.QueueDeclare().QueueName; //使用direct exchange type,指定exchange名称 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //发布到direct类型exchange,必须指定routingKey channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);
消费者示例代码:
//申明direct类型exchange channel.ExchangeDeclare (exchange: "directEC", type: "direct"); //绑定队列到direct类型exchange,需指定路由键routingKey channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");
4.3. topic
topic是direct的升级版,是一种模式匹配的路由机制。它支持使用两种通配符来进行模式匹配:符号#
和符号*
。其中*
匹配一个单词, #
则表示匹配0个或多个单词,单词之间用.
分割。以下图所示。
生产者示例代码:
// 生成随机队列名称 var queueName = channel.QueueDeclare().QueueName; //使用topic exchange type,指定exchange名称 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //发布到topic类型exchange,必须指定routingKey channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);
消费者示例代码:
//申明topic类型exchange channel.ExchangeDeclare (exchange: "topicEC", type: "topic"); //申明随机队列名称 var queuename = channel.QueueDeclare ().QueueName; //绑定队列到topic类型exchange,需指定路由键routingKey channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");
5. RPC
RPC——Remote Procedure Call,远程过程调用。
那RabbitMQ如何进行远程调用呢?示意图以下:
第一步,主要是进行远程调用的客户端须要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。
远程调用客户端:
//申明惟一guid用来标识这次发送的远程调用请求 var correlationId = Guid.NewGuid().ToString(); //申明须要监听的回调队列 var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueue;//指定回调队列 properties.CorrelationId = correlationId;//指定消息惟一标识 string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //发布消息 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"[*] Request fib({number})"); // //建立消费者用于处理消息回调(远程调用返回结果) var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。 if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"[x]: {responseMsg}"); } };
远程调用服务端:
//申明队列接收远程调用请求 channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message."); //请求处理逻辑 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //从请求的参数中获取请求的惟一标识,在消息回传时一样绑定 var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //将远程调用结果发送到客户端监听的队列上 channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //手动发回消息确认 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
6. 总结
基于上面的demo和对几种不一样exchange路由机制的学习,咱们发现RabbitMQ主要是涉及到如下几个核心概念:
文章来源:https://www.cnblogs.com/sheng-jie/p/7192690.html