今天开始RabbitMQ教程的第二讲,废话很少说,直接进入话题。 (使用.NET 客户端 进行事例演示)
在第一个教程中,咱们编写了一个从命名队列中发送和接收消息的程序。在本教程中,咱们将建立一个工做队列,这个队列将用于在多个工人之间分配耗时的任务。
工做队列【又名:任务队列】背后主要的思想是避免马上执行耗时的工做任务,而且一直要等到它结束为止。相反,咱们规划任务并晚些执行。咱们封装一个任务做为消息发送到一个命名的消息队列中,后台运行的工做线程将获取任务而且最终执行该任务。当你运行不少的任务的时候他们会 共享工做线程和队列。
这个概念在Web应用程序中是尤为有用的,异步执行能够在短期内处理一个复杂Http请求。
一、准备工做
在本系列教程的前一个教程中,咱们发送了一个包含“Hello World!”的消息,如今咱们发送一个表明复杂任务的字符串。咱们不会建立一个真实的任务,好比对图像文件进行处理或PDF文件的渲染,所以让咱们伪装咱们很忙-经过采用Thread.Sleep()功能来实现复杂和繁忙。咱们将根据字符串中的点的数量做为它的复杂性,每个点将占一秒钟的“工做”。例如,一个假的任务描述Hello…,有三个点,咱们就须要三秒。
咱们将稍微修改一下咱们之前的例子中Send 程序的代码,容许从命令行发送任意消息。这个程序将把任务发送到咱们的消息队列中,因此咱们叫它NewTask:
像教程一,咱们须要生成两个项目。html
dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
信息数据咱们能够从命令行的参数得到:
shell
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
咱们的旧Receive.cs代码也须要一些修改:须要为消息体中每一个点都须要消耗一秒钟的工做,先要计算出消息体内有几个点号,而后在乘以1000,就是这个复杂消息所消耗的时间,同时表示这是一个复杂任务。RabbitMQ将处理和发送理消息,而且执行这个任务,让咱们拷贝如下代码黏贴到Worker的项目中,并进行相应的修改:
缓存
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
咱们本身假设的任务的模拟执行时间就是:服务器
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);
二、轮询调度
咱们使用任务队列的好处之一就是使任务能够并行化,增长系统的并行处理能力。若是咱们正在创建一个积压的工做,咱们能够牢牢增长更多的Worker实例就能够完成大量工做的处理,修改和维护就很容易。
首先,让咱们同时运行两个Worker实例。他们都会从队列中获得消息,但具体如何?让我想一想。
你须要打开三个控制台的应用程序。两个控制台程序将运行Wroker程序。这些控制台程序将是咱们的两个消费者C1和C2。异步
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C # shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台应用程序中咱们将发布新的任务。只要你已经启动了消费者程序,你能够看到一些发布的信息:post
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."
让咱们看看交付了什么东西在Workers:
fetch
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认状况下,RabbitMQ将会发送每一条消息给序列中每个消费者。每一个消费者都会获得相同数量的信息。这种分发消息的方式叫作轮询。咱们尝试这三个或更多的Workers。
三、消息确认
处理一个任务可能须要几秒钟。若是有一个消费者开始了一个长期的任务,而且只作了一部分就发生了异常,你可能想知道到底发生了什么。咱们目前的代码,一旦RabbitMQ发送一个消息给客户当即从内存中移除。在这种状况下,若是你关掉了一个Worker,咱们将失去它正在处理的信息。咱们也将丢失发送给该特定员工但还没有处理的全部信息。
但咱们不想失去任何任务。若是一个Worker出现了问题,咱们但愿把这个任务交给另外一个Woker。
为了确保消息不会丢失,RabbitMQ支持消息确认机制。ACK(nowledgement)确认消息是从【消息使用者】发送回来告诉RabbitMQ结果的一种特殊消息,确认消息告诉RabbitMQ指定的接受者已经收到、处理,而且RabbitMQ你能够自由删除它。
若是一个【消费者Consumer】死亡(其通道关闭,链接被关闭,或TCP链接丢失)不会发送ACK,RabbitMQ将会知道这个消息并无彻底处理,将它从新排队。若是有其余用户同时在线,它就会快速地传递到另外一个【消费者】。这样你就能够确定,没有消息丢失,即便【Worker】偶尔死了或者出现问题。
在没有任何消息超时;当【消费者】死亡的时候RabbitMQ会从新发送消息。只要是正常的,即便处理消息须要很长很长的时间也会重发消息给【消费者】。
消息确认的机制默认是打开的。在之前的例子中,咱们明确地把它们关闭设置noAck(“没有手动确认”)参数为true。是时候删除这个标志了,而且从Worker发送一个适当确认消息,一旦咱们完成了工做任务。this
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这个代码,咱们能够确定的是,即便你使用Ctrl + C关掉一个正在处理消息的Worker,也不会丢失任何东西。【Worker】被杀死后,未被确认的消息很快就会被退回。
四、忘记确认
忘记调用BasicAck这是一个常见的错误。虽然这是一个简单的错误,但后果是严重的。消息会被退回时,你的客户退出(这可能看起来像是随机的)可是RabbitMQ将会使用更多的内存保存这些任何延迟确认消息。
为了调试这种错误,你可使用rabbitmqctl打印messages_unacknowledged字段值:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
若是是在Window环境下,删除掉sudo字符就能够:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
五、持久性的消息
咱们已经学会了如何确保即便【消费者】死亡,任务也不会丢失。可是若是RabbitMQ服务器中止了,咱们的任务仍然会丢失的。
当RabbitMQ退出或死机会清空队列和消息,除非你告诉它即便宕机也不能丢失任何东西。要确保消息不会丢失,有两件事情咱们是必须要作的:咱们须要将队列和消息都标记为持久的。
首先,咱们须要确保咱们RabbitMQ历来都不会损失咱们的的队列。为了作到这一点,咱们须要声明咱们的队列为持久化的:spa
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
虽然这个命令自己是正确的,它不会起做用在咱们目前的设置中。这是由于咱们已经定义了一个叫hello的队列,它不是持久化的。RabbitMQ不容许你使用不一样的参数从新定义一个已经存在的队列,在任何程序代码中,都试图返回一个错误。但有一个快速的解决方法-让咱们声明一个名称不一样的队列,例如task_queue:命令行
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
这行代码QueueDeclare表示队列的声明,建立并打开队列,这个段代码须要应用到【生产者】和【消费者】中。
在这一点上,咱们相信,task_queue队列不会丢失任何东西即便RabbitMQ重启了。如今咱们要经过设置IbasicProperties.SetPersistent属性值为true来标记咱们的消息持久化的。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
关于消息持久性的注意
将消息标记为持久性并不能彻底保证消息不会丢失。虽然该设置告诉RabbitMQ时时刻刻把保存消息到磁盘上,可是这个时间间隔仍是有的,当RabbitMQ已经接受信息但并无保存它,此时还有可能丢失。另外,RabbitMQ不会为每一个消息调用fsync(2)--它可能只是保存到缓存并无真正写入到磁盘。虽然他的持久性保证不强,但它咱们简单的任务队列已经足够用了。若是您须要更强的保证,那么您可使用Publisher Comfirms。
六、公平调度
你可能已经注意到,调度仍然没有像咱们指望的那样的工做。例如,在两个Workers的状况下,当全部的奇数消息是沉重的,甚至消息是轻的,一个Worker忙个不停,而另外一个Worker几乎没事可作。哎,RabbitMQ对上述状况一无所知,仍将消息均匀发送。
发生这种状况是由于当有消息进入队列的时候RabbitMQ才仅仅调度了消息。它根本不看【消费者】未确认消息的数量,它只是盲目的把第N个消息发送给第N个【消费者】。
为了不上述状况的发生,咱们可使用prefetchcount = 1的设置来调用BasicQos方法。这个方法告诉RabbitMQ在同一时间不要发送多余一个消息的数据给某个【Worker】。或者,换句话说,当某个消息处理完毕,而且已经收到了消息确认以后,才能够继续发送消息给那个【Worker】。相反,它将把消息分配给给下一个不忙的【Worker】。
channel.BasicQos(0, 1, false);
注意队列大小
若是全部的工人都很忙,你的队列能够填满。你要留意这一点,也许会增长更多的【Worker】,或者有其余的策略。
七、把全部的代码放在一块儿
NewTask.cs类最终的代码是:
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class NewTask 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.QueueDeclare(queue: "task_queue", 14 durable: true, 15 exclusive: false, 16 autoDelete: false, 17 arguments: null); 18 19 var message = GetMessage(args); 20 var body = Encoding.UTF8.GetBytes(message); 21 22 var properties = channel.CreateBasicProperties(); 23 properties.Persistent = true; 24 25 channel.BasicPublish(exchange: "", 26 routingKey: "task_queue", 27 basicProperties: properties, 28 body: body); 29 Console.WriteLine(" [x] Sent {0}", message); 30 } 31 32 Console.WriteLine(" Press [enter] to exit."); 33 Console.ReadLine(); 34 } 35 36 private static string GetMessage(string[] args) 37 { 38 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); 39 } 40 }
Worker.cs完整源码以下:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 using System.Threading; 6 7 class Worker 8 { 9 public static void Main() 10 { 11 var factory = new ConnectionFactory() { HostName = "localhost" }; 12 using(var connection = factory.CreateConnection()) 13 using(var channel = connection.CreateModel()) 14 { 15 channel.QueueDeclare(queue: "task_queue", 16 durable: true, 17 exclusive: false, 18 autoDelete: false, 19 arguments: null); 20 21 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 22 23 Console.WriteLine(" [*] Waiting for messages."); 24 25 var consumer = new EventingBasicConsumer(channel); 26 consumer.Received += (model, ea) => 27 { 28 var body = ea.Body; 29 var message = Encoding.UTF8.GetString(body); 30 Console.WriteLine(" [x] Received {0}", message); 31 32 int dots = message.Split('.').Length - 1; 33 Thread.Sleep(dots * 1000); 34 35 Console.WriteLine(" [x] Done"); 36 37 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 38 }; 39 channel.BasicConsume(queue: "task_queue", 40 noAck: false, 41 consumer: consumer); 42 43 Console.WriteLine(" Press [enter] to exit."); 44 Console.ReadLine(); 45 } 46 } 47 }
使用消息确认和BasicQos方法能够创建一个工做队列。持久化的选项可让咱们的任务队列保持存活即便RabbitMQ重启。
好了,写完了,翻译的很差,你们见谅。
原文地址以下:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
欢迎你们来探讨。