目录:html
轮询调度(Round-robin dispatching):即依次分配分配任务给worker。shell
消息答复(Message acknowledgement):在consumer处理完以后,进行消息答复。避免杀掉worker后,message消息。并发
消息持久化(Message durability):在RabbitMQ server中止后,确保message不会丢失。须要持久化queue和message函数
公平调度(Fair dispatch):为了使worker不会出现有的一直在busy,而有的一致很闲的状态。使用的是 channel.BasicQos(0, 1, false) ,确保worker确认完成上一个任务后,才会分配下一个。fetch
代码spa
在第一个教程中,咱们讲了在一个指定的queue中发送和接收message. 下面咱们讲一个用于在多个worker之间分配费时任务的工做队列(Work Queue).code
Work Queue的主要思想就是避免当即作一个资源集中型任务而且还必须等待它完成。server
咱们把任务封装成一个message,而且发送到队列。这里面的worker实际就是consumer,以后会由它们执行这些任务。htm
咱们会统计字符串中的 . 来使程序sleep。即便用Thread.Sleep()。例如Hello...会花费3秒。blog
在这里咱们的producer叫作NewTask。而咱们的consumer叫作worker。
它们能够在上一节的基础上作一些修改获得
发送消息代码修改
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
接收消息代码修改
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
这个里面的消息依然是自动答复的
上面的修改是为了模拟真实耗时任务。
使用队列任务的一个好处就是能很容易的进行并发任务。
首先,咱们尝试同时运行两个worker。它们能够同时从队列中取到message。那么具体是怎样呢?
你须要打开三个控制台程序,两个运行worker程序。
# shell 1 cd Worker dotnet run# => [*] Waiting for messages. To exit press CTRL+C
# shell 2 cd Worker dotnet run# => [*] Waiting for messages. To exit press CTRL+C
第三个用来发布new tasks.
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."
咱们看下两个worker中的结果:
默认状况下,RabbitMQ会轮询发送每一个message。因此,平均来讲,每一个consumer会获得相同数量的messages . 这种分发message的方式叫作轮询。
同时,注意到,queue中的message只能发到一个worker里,即两个worker里的task不会重复,即这是一种点对点的方式。
你想下,若是一个consumer正在进行一个长任务(long task),而且就完成了一部分就死掉了。那么会发生什么呢?在咱们当前的代码里,一旦RabbitMQ发送了一个message到一个consumer,那么RabbitMQ里的message马上就会被标记为删除(deletion)。 在这种状况下,若是你杀死一个worker,咱们将会丢失这个worker正在处理的message,咱们也会丢失全部已经分配到这个worker但还没处理的messages。
注意:默认状况下,并非会等每一个task在consumer中执行完才会分发下一个message,也有多是一下分发好多条。具体能够经过设置。
可是,咱们不想丢失任务tasks。若是一个worker死掉了,咱们想要task会被发送到另外一个worker。
为了message再也不丢失,RabbitMQ引入了message acknowledge。一个ack 会在一个message被接收,处理后被consumer发送回来,而且RabbitMQ把它标记为删除。
若是一个consumer还没发送一个ack就死掉了。RabbitMQ会认为它没被彻底处理,而且re-queue 它。若是线上同时还有其余的consumer,那么RabbitMQ会很快的把它发送到另外一个consumer。这样即便worker忽然死了,也没有message会丢失了。
消息过期是不存在的。RabbitMQ将重发这个message,当consumer死掉时。即便在处理message时花费很长时间,也没有关系(由于不存在过期)
Manual message acknowledgment(手动消息答复) 默认是开启的。在前一个例子中,咱们经过设置autoAck为true把它关闭了。
如今,咱们把手动消息答复打开(即autoAck设置为false),而且,一旦咱们作完了一个task,咱们就发送一个确认(a acknowledgment).
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //消息答复 }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);//设置手动消息答复开启
这样,即便咱们杀死了一个worker,咱们的message也不会丢失了。
Acknowledgement必须和接收message的通道是同一个。不然会报 channel-level protocol exception。
那么,若是咱们忘记发acknowledgement会怎么样呢?
忘记BasicAck是一个常发生的错误,可是后果却很严重。当你的client退出后,messages也会被重发。可是RabbitMQ会吃掉(消耗)愈来愈多的内存,随着它没法释听任何unacked messages.
你能够经过message_unacknowledged打印出没确认的message
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows上
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们已经学了当consumer被杀死时,使task不丢失。可是若是咱们的RabbitMQ server中止了,咱们的task依然会丢失。
想要server中止时,messages不丢失,须要标记queue和message是持久化的(durable)。
首先,咱们标记queue是durable
channel.QueueDeclare(queue: "hello", durable: true, //标记queue为durable exclusive: false, autoDelete: false, arguments: null);
虽然上面的代码自己是正确的,可是在目前却不会生效。由于咱们以前已经定义过了一个hello的queue,它是not durable。RabbitMQ不会容许你使用不一样的参数从新定义一个已经存在的queue,而且会报错。
这里,咱们先直接声明一个不一样名称的queue。以下
channel.QueueDeclare(queue: "task_queue", durable: true, //标记queue为durable exclusive: false, autoDelete: false, arguments: null);
其中,QueueDeclare须要被应用到producer和consumer的代码里。
如今,咱们标记messages为persistent(永恒的)。经过设置IBasicProperties.SetPersistent为true.
var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置message是persistent
你可能已经注意到,上面的调度仍然不能按咱们想要的工做。它可能出现两个worker一个一直很忙,一个一直很闲(任务执行时间不同)。
这是由于RabbitMQ会被分发,当message输入一个queue。它不会看一个consumer未完成的queue , 它仅仅盲目的分发第几个到第几个consumer.
为了改变行为,咱们可使用BasicQos,而且prefetchCount=1。这个会告诉RabbityMQ每次给只会给worker一个message。或者说,RabbitRQ在worker处理而且确认以前不会分发一个新的message。也能够说,RabbitMQ会分发给下一个不忙的worker。
channel.BasicQos(0, 1, false);
注意queue的大小
若是你的全部worker都是busy的,说明你的queue已经满了。你应该对此保持关注,而且或者你能够增长更多的worker或者有一些其余策略。
NewTask.cs
using System;using RabbitMQ.Client;using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //建立链接 using(var channel = connection.CreateModel()) //建立channel { channel.QueueDeclare(queue: "task_queue", //声明一个durable的queue durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); //取得message var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); //设置message是persistent properties.Persistent = true; channel.BasicPublish(exchange: "", //发送 routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }
Worker.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //创建链接 using(var channel = connection.CreateModel()) //创建通道(channel) { channel.QueueDeclare(queue: "task_queue", //声明queue是durable durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //设置公平的调度策略(fair dispatch) Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); //回调函数 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; //模拟真实业务花费一些时间 Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //消息答复 }; channel.BasicConsume(queue: "task_queue", //发送而且设置手动消息答复开启 autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
参考网址:RabbitMQ