快速掌握RabbitMQ(四)——两种消费模式和QOS的C#实现

  本篇介绍一下RabbitMQ中的消费模式,在前边的全部栗子中咱们采用的消费者都是EventingBasicConsumer,其实RabbitMQ中还有其余两种消费模式:BasicGet和QueueBaicConsumer,下边介绍RabiitMQ的消费模式,及使用它们时须要注意的一些问题。工具

1 RabbitMQ的消费模式

0 准备工做

  使用Web管理工具添加exchange、queue并绑定,bindingKey为“mykey”,以下所示:fetch

  生产者代码以下:ui

       static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            //建立链接connection
            using (var connection = factory.CreateConnection())
            {
                //建立通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生产者准备就绪....");
                    string message = "";
                    //在控制台输入消息,按enter键发送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"【{message}】发送到Broke成功!");
                    }
                }
            }
            Console.ReadKey();
        }

1 EventingBasicConsumer介绍

  EventingBasicConsumer是发布/订阅模式的消费者,即只要订阅的queue中有了新消息,Broker就会当即把消息推送给消费者,这种模式能够保证消息及时地被消费者接收到。EventingBasicConsumer是长链接的:只须要建立一个Connection,而后在Connection的基础上建立通道channel,消息的发送都是经过channel来执行的,这样能够减小Connection的建立,比较节省资源。前边咱们已经使用了不少次EventingBaiscConsumer,这里简单展现一下使用的方式,注释比较详细,就很少介绍了。spa

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    #region EventingBasicConsumer
                    //定义一个EventingBasicConsumer消费者                                    
                    var consumer = new EventingBasicConsumer(channel);
                    //接收到消息时触发的事件
                    consumer.Received += (model, ea) =>
                    {

                        Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
                    };
                    Console.WriteLine("消费者准备就绪....");
                    //调用消费方法 queue指定消费的队列,autoAck指定是否自动确认,consumer就是消费者对象
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: true,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

   执行程序,结果以下,只要咱们在生产者端发送一条消息到Broker,Broker就会当即推送消息到消费者。code

2 BasicGet方法介绍

   咱们知道使用EventingBasicConsumer可让消费者最及时地获取到消息,使用EventingBasicConsumer模式时消费者在被动的接收消息,即消息是推送过来的,Broker是主动的一方。那么能不能让消费者做为主动的一方,消费者何时想要消息了,就本身发送一个请求去找Broker要?答案使用Get方式。Get方式是短链接的,消费者每次想要消息的时候,首先创建一个Connection,发送一次请求,Broker接收到请求后,响应一条消息给消费者,而后断开链接。RabbitMQ中Get方式和HTTP的请求响应流程基本同样,Get方式的实时性比较差,也比较耗费资源。咱们看一个Get方式的栗子:server

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    #region BasicGet
                    //经过BasicGet获取消息,开启自动确认
                    BasicGetResult result = channel.BasicGet(queue:"myqueue",autoAck:true);
                    Console.WriteLine($"接收到消息【{Encoding.UTF8.GetString(result.Body)}】");
                    //打印exchange和routingKey
                    Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}");
                    Console.ReadLine();
                    #endregion
                }
            }
        }

  执行生成者和消费者程序,生产者发送三条消息,而消费者只获取了一条消息,这是由于channel.BasicGet()一次只获取一条消息,获取到消息后就把链接断开了。对象

  补充:RabbitMQ还有一种消费者QueueBaicConsumer,用法和Get方式相似,QueueBaicConsumer在官方API中标记已过期,这里再也不介绍,有兴趣的小伙伴能够本身研究下。 blog

2 Qos介绍

  在介绍Qos(服务质量)前咱们先看一下使用EventingBasicConsumer的一个坑,使用代码演示一下,简单修改一下上边栗子的代码rabbitmq

  生产者代码以下,这里生产者发送了100条消费到Broker队列

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            //建立链接connection
            using (var connection = factory.CreateConnection())
            {
                //建立通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生产者准备就绪....");
                    #region 添加100条数据
                    for (int i = 0; i < 100; i++)
                    {
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes($"第{i}条消息"));
                    }
                    #endregion

                }
            }
            Console.ReadKey();
        }

  消费端代码以下,消费端采用的是自动确认(autoAck=true),即Broker把消息发送给消费者就会确认成功,不关心消息有没有处理完成,假设每条消息处理须要5s

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    #region EventingBasicConsumer
                    //定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    //接收到消息时执行的任务
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000 * 5);
                        Console.WriteLine($"处理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
                    };
                    Console.WriteLine("消费者准备就绪....");
                    //处理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: true,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

  咱们先执行生产者程序,执行完成后发现queue中有了100条ready状态的消息,表示消息成功发送到了队列

  接着咱们执行消费者,消费者执行后,Broker会把消息一股脑发送过去,经过Web管理界面咱们看到queue中已经没有消息了,以下:

   咱们再看一下消费者的执行状况,发现消费者仅仅处理了4条消息,还有96条消息没有处理,这就是说消费者没有处理完消息,可是queue中的消息都已经删除了。若是这时消费者挂掉了,全部未处理的消息都会丢失,在某些场合中,丢失数据的后果是十分严重的。

 

  对于上边的问题,咱们可能会想到使用显示确认来保证消息不会丢失:将BasicConsume方法的autoAck设置为false,而后处理一条消息后手动确认一下,这样的话已处理的消息在接收到确认回执时被删除,未处理的消息以Unacked状态存放在queue中。若是消费者挂了,Unacked状态的消息会自动从新变成Ready状态,如此一来就不用担忧消息丢失了,修改消费者代码以下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    #region EventingBasicConsumer
                    //定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    //接收到消息时执行的任务
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000 * 5);
                        //处理完成,手动确认
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine($"处理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
                    };
                    Console.WriteLine("消费者准备就绪....");
                    //处理消息
                    channel.BasicConsume(queue: "myqueue",
                                          autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

  从新执行生产者,而后执行消费者,Web管理其中看到结果以下:在执行消费者时,消息会一股脑的发送给消费者,而后状态都变成Unacked,消费者执行完一条数据手动确认后,这条消息从queue中删除。当消费者挂了(咱们能够直接把消费者关掉来模拟挂掉的状况),没有处理的消息会自动从Unacked状态变成Ready状态,不用担忧消息丢失了!打开Web管理界面看到状态以下:

  经过显式确认的方式能够解决消息丢失的问题,但这种方式也存在一些问题:①当消息有十万,百万条时,一股脑的把消息发送给消费者,可能会形成消费者内存爆满;②当消息处理比较慢的时,单一的消费者处理这些消息可能很长时间,咱们天然想到再添加一个消费者加快消息的处理速度,可是这些消息都被原来的消费者接收了,状态为Unacked,因此这些消息不会再发送给新添加的消费者。针对这些问题怎么去解决呢?

  RabbitMQ提供的Qos(服务质量)能够完美解决上边的问题,使用Qos时,Broker不会再把消息一股脑的发送给消费者,咱们能够设置每次传输给消费者的消息条数n,消费者把这n条消息处理完成后,再获取n条数据进行处理,这样就不用担忧消息丢失、服务端内存爆满的问题了,由于没有发送的消息状态都是Ready,因此当咱们新增一个消费者时,消息也能够当即发送给新增的消费者。注意Qos只有在消费端使用显示确认时才有效,使用Qos的方式十分简单,在消费端调用 channel.BasicQos() 方法便可,修改服务端代码以下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在设备ip,这里就是本机
                HostName = "127.0.0.1",
                UserName = "wyy",//用户名
                Password = "123321"//密码
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); #region EventingBasicConsumer

                    //定义消费者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    //接收到消息时执行的任务
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000 * 5);
                        //处理完成,手动确认
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine($"处理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
                    };
                    Console.WriteLine("消费者准备就绪....");
                    //处理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

   清空一下queue中的消息,从新启动生产者,而后启动消费者,打开Web管理界面,看到状态以下所示:

    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false) 方法中参数prefetchSize为预取的长度,通常设置为0便可,表示长度不限;prefetchCount表示预取的条数,即发送的最大消息条数;global表示是否在Connection中全局设置,true表示Connetion下的全部channel都设置为这个配置。

3 小结

  本节演示了RabbitMQ的两种消费者:EventingBasicConsumer和BasicGet。EventingBasicConsumer是基于长链接,发布订阅模式的消费方式,节省资源且实时性好,这是开发中最经常使用的消费模式。在一些须要消费者主动获取消息的场合,咱们可使用Get方式,Get方式是基于短链接的,请求响应模式的消费方式。

  Qos能够设置消费者一次接收消息的最大条数,可以解决消息拥堵时形成的消费者内存爆满问题。Qos也比较适用于耗时任务队列,当任务队列中的任务不少时,使用Qos后咱们能够随时添加新的消费者来提升任务的处理效率。

相关文章
相关标签/搜索