RabbitMQ教程C#版 - 工做队列

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672)。若是你使用不一样的主机、端口或证书,则须要调整链接设置。javascript

从哪里得到帮助
若是您在阅读本教程时遇到困难,能够经过邮件列表 联系咱们css

工做队列#

(使用 .NET Client)html

在 教程[1] 中,咱们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,咱们将建立一个工做队列,用于在多个工做线程间分发耗时的任务。java

工做队列(又名:任务队列)背后的主要想法是避免当即执行资源密集型、且必须等待其完成的任务。相反的,咱们把这些任务安排在稍后完成。咱们能够将任务封装为消息并把它发送到队列中,在后台运行的工做进程将从队列中取出任务并最终执行。当您运行多个工做线程,这些任务将在这些工做线程之间共享。nginx

这个概念在Web应用程序中特别有用,由于在一个 HTTP 请求窗口中没法处理复杂的任务。git

准备#

咱们将略微修改上一个示例中的Send程序,以其能够在命令行发送任意消息。
这个程序将调度任务到咱们的工做队列中,因此让咱们把它命名为NewTaskgithub

像 教程[1]同样,咱们须要生成两个项目:sql

Copy
dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore
Copy
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);

从命令行参数获取消息的帮助方法:docker

Copy
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }

咱们旧的Receive.cs脚本也须要进行一些更改:它须要为消息体中的每一个点模拟一秒种的时间消耗。它将处理由 RabbitMQ 发布的消息,并执行任务,所以咱们把它复制到Worker项目并修改:shell

Copy
// 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

模拟虚拟任务的执行时间:

Copy
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);

循环调度#

使用任务队列的优势之一是可以轻松地并行工做。若是咱们正在积累积压的工做,咱们仅要增长更多的工做者,并以此方式能够轻松扩展。

首先,咱们尝试同时运行两个Worker实例。他们都会从队列中获取消息,但究竟如何?让咱们来看看。

您须要打开三个控制台,两个运行Worker程序,这些控制台做为咱们的两个消费者 - C1和C2。

Copy
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
Copy
# shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,咱们将发布一些新的任务。一旦你已经运行了消费者,你能够尝试发布几条消息:

Copy
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."

让咱们看看有什么发送到了咱们的Worker程序:

Copy
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
Copy
# shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'

默认状况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。消费者数量平均的状况下,每一个消费者将会得到相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker程序来验证。

消息确认#

处理一项任务可能会须要几秒钟的时间。若是其中一个消费者开启了一项长期的任务而且只完成了部分就挂掉了,您可能想知道会发生什么?在咱们当前的代码中,一旦 RabbitMQ 把消息分发给了消费者,它会当即将这条消息标记为删除。在这种状况下,若是您停掉某一个 Worker,咱们将会丢失这条正在处理的消息,也将丢失全部分发到该 Worker 但还没有处理的消息。

可是咱们不想丢失任何一个任务。若是一个 Worker 挂掉了,咱们但愿这个任务能被从新分发给其余 Worker。

为了确保消息永远不会丢失,RabbitMQ 支持 消息确认 机制。消费者回发一个确认信号 Ack(nowledgement) 给 RabbitMQ,告诉它某个消息已经被接收、处理而且能够自由删除它。

若是一个消费者在尚未回发确认信号以前就挂了(其通道关闭,链接关闭或者 TCP 链接丢失),RabbitMQ 会认为该消息未被彻底处理,并将其从新排队。若是有其余消费者同时在线,该消息将会被会迅速从新分发给其余消费者。这样,即使 Worker 意外挂掉,也能够确保消息不会丢失。

没有任何消息会超时;当消费者死亡时,RabbitMQ 将会从新分发消息。即便处理消息须要很是很是长的时间也不要紧。

默认状况下,手动消息确认 模式是开启的。在前面的例子中,咱们经过将autoAck(“自动确认模式”)参数设置为true来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志而且从 Worker 手动发送一个恰当的确认信号给RabbitMQ。

Copy
// 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手动发送消息确认信号。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // autoAck:false - 关闭自动消息确认,调用`BasicAck`方法进行手动消息确认。 // autoAck:true - 开启自动消息确认,当消费者接收到消息后就自动发送 ack 信号,不管消息是否正确处理完毕。 channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用上面这段代码,咱们能够肯定的是,即便一个 Worker 在处理消息时,咱们经过使用CTRL + C来终止它,也不会丢失任何消息。Worker 挂掉不久,全部未确认的消息将会被从新分发。

忘记确认
遗漏BasicAck是一个常见的错误。这是一个很简单的错误,但致使的后果倒是严重的。当客户端退出时(看起来像是随机分发的),消息将会被从新分发,可是RabbitMQ会吃掉愈来愈多的内存,由于它不能释放未确认的消息。
为了调试这种错误,您可使用rabbitmqctl来打印messages_unacknowledged字段:

Copy
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,删除sudo

Copy
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化#

咱们已经学习了如何确保即便消费者挂掉,任务也不会丢失。可是若是 RabbitMQ 服务器中止,咱们的任务仍是会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样作。为了确保消息不会丢失,有两件事是必须的:咱们须要将队列和消息标记为持久

首先,咱们须要确保 RabbitMQ 永远不会丢失咱们的队列。为了作到这一点,咱们须要把队列声明是持久的(Durable)

Copy
// 声明队列,经过指定 durable 参数为 true,对消息进行持久化处理。 channel.QueueDeclare(queue: "hello",  durable: true,  exclusive: false,  autoDelete: false,  arguments: null);

虽然这个命令自己是正确的,可是它在当前设置中不会起做用。那是由于咱们已经定义过一个名为hello的队列,而且这个队列不是持久化的。RabbitMQ 不容许使用不一样的参数从新定义已经存在的队列,并会向尝试执行该操做的程序返回一个错误。但有一个快速的解决办法 - 让咱们用不一样的名称声明一个队列,例如task_queue

Copy
channel.QueueDeclare(queue: "task_queue",  durable: true,  exclusive: false,  autoDelete: false,  arguments: null);

注意,该声明队列QueueDeclare方法的更改须要同时应用于生产者和消费者代码。

此时,咱们能够肯定的是,即便 RabbitMQ 从新启动,task_queue队列也不会丢失。如今咱们须要将咱们的消息标记为持久的(Persistent) - 经过将IBasicProperties.Persistent设置为true

Copy
// 将消息标记为持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true;

关于消息持久性的说明
将消息标记为Persistent并不能彻底保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接收到消息而且还没有保存消息时仍有一段时间间隔。此外,RabbitMQ 不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并无真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来讲已经足够了。若是您须要一个更加健壮的保证,可使用 发布者确认

公平调度#

您可能已经注意到调度仍然没法彻底按照咱们指望的方式工做。例如,在有两个 Worker 的状况下,假设全部奇数消息都很庞大、偶数消息都很轻量,那么一个 Worker 将会一直忙碌,而另外一个 Worker 几乎不作任何工做。是的,RabbitMQ 并不知道存在这种状况,它仍然会平均地分发消息。

发生这种状况是由于 RabbitMQ 只是在消息进入队列后就将其分发。它不会去检查每一个消费者所拥有的未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 位消费者。

为了改变上述这种行为,咱们可使用参数设置prefetchCount = 1basicQos方法。

这就告诉 RabbitMQ 同一时间不要给一个 Worker 发送多条消息。或者换句话说,不要向一个 Worker 发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的 Worker。

Copy
channel.BasicQos(0, 1, false);

关于队列大小的说明
若是全部的 Worker 都很忙,您的队列可能会被填满。请留意这一点,能够尝试添加更多的 Worker,或者使用其余策略。

组合在一块儿#

咱们NewTask.cs类的最终代码:

Copy
using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { // 实例化链接工厂。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 建立链接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 声明队列,标记为持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 获取发送消息。 var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); // 将消息标记为持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送数据包。 channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }

(NewTask.cs 源码)

还有咱们的Worker.cs

Copy
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { // 实例化链接工厂。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 建立链接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 声明队列,标记为持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 告知 RabbitMQ,在未收到当前 Worker 的消息确认信号时,再也不分发给消息,确保公平调度。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); // 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手动发送消息确认信号。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

(Worker.cs 源码)

使用消息确认机制和BasicQ您能够建立一个工做队列。即便 RabbitMQ 从新启动,经过持久性选项也可以让任务继续存在。

有关IModel方法和IBasicProperties的更多信息,您能够在线浏览 RabbitMQ .NET客户端API参考

如今,咱们能够继续阅读 教程[3],学习如何向多个消费者发送相同的消息。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的很差请见谅,若有翻译错误还请指正。

做者:Esofar

出处:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html

本站使用「CC BY 4.0」创做共享协议,转载请在文章明显位置注明做者及出处。

相关文章
相关标签/搜索