C#使用RabbitMQ(转)

 

 

1. 说明 

  在企业应用系统领域,会面对不一样系统之间的通讯、集成与整合,尤为当面临异构系统时,这种分布式的调用与通讯变得愈加重要。其次,系统中通常会有不少对实时性要求不高的可是执行起来比较较耗时的地方,好比发送短信,邮件提醒,更新文章阅读计数,记录用户操做日志等等,若是实时处理的话,在用户访问量比较大的状况下,对系统压力比较大。html

面对这些问题,咱们通常会将这些请求,放在消息队列MQ中处理;异构系统之间使用消息进行通信。node

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。web

  MQ是消费-生产者模型的一个典型的表明,一端往消息队列中不断写入消息,而另外一端则能够读取或者订阅队列中的消息。数据库

   RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 windows

  消息传递相较文件传递与远程过程调用(RPC)而言,彷佛更胜一筹,由于它具备更好的平台无关性,并可以很好地支持并发与异步调用。因此若是系统中出现了以下状况:数组

  • 对操做的实时性要求不高,而须要执行的任务极为耗时;
  • 存在异构系统间的整合;

  通常的能够考虑引入消息队列。对于第一种状况,经常会选择消息队列来处理执行时间较长的任务。引入的消息队列就成了消息处理的缓冲区。消息队列引入的异步通讯机制,使得发送方和接收方都不用等待对方返回成功消息,就能够继续执行下面的代码,从而提升了数据处理的能力。尤为是当访问量和数据流量较大的状况下,就能够结合消息队列与后台任务,经过避开高峰期对大数据进行处理,就能够有效下降数据库处理数据的负荷。 浏览器

  本文简单介绍在RabbitMQ这一消息代理工具,以及在.NET中如何使用RabbitMQ.缓存

2. 搭建环境

  2.1 安装Erlang语言运行环境服务器

  因为RabbitMQ使用Erlang语言编写,因此先安装Erlang语言运行环境。具体移步博客:windows配置Erlang环境cookie

  2.2 安装RabbitMQ服务端

  地址 http://www.rabbitmq.com/

  下载安装。

  使RabbitMQ以Windows Service的方式在后台运行:打开cmd切换到sbin目录下执行

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

   如今RabbitMQ的服务端已经启动起来了。

  要查看和控制RabbitMQ服务端的状态,能够用rabbitmqctl这个脚本。

  好比查看状态:

rabbitmqctl status

  

  假如显示node没有链接上,须要到C:\Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:\Users\{用户名},这是Erlang的Cookie文件,容许与Erlang进行交互。

   使用命令查看用户:

rabbitmqctl list_users

  RabbitMQ会为咱们建立默认的用户名guest和密码guest,guest默认拥有RabbitMQ的全部权限。

  通常的,咱们须要新建一个咱们本身的用户,设置密码,并授予权限,并将其设置为管理员,可使用下面的命令来执行这一操做:

rabbitmqctl  add_user  JC JayChou   //建立用户JC密码为JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //赋予JC读写全部消息队列的权限
rabbitmqctl  set_user_tags JC administrator    //分配用户组

  修改JC密码为123:

rabbitmqctl change_password JC  123

  删除用户JC:

rabbitmqctl delete_user  JC

  也能够开启rabbitmq_management插件,在web界面查看和管理RabbitMQ服务

rabbitmq-plugins enable rabbitmq_management  

 

  2.3下载RabbitMQ的Client端dll

  下载地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

  本人下载了这个 rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

   解压,咱们须要的是这个文件,之后会引用到vs的项目中:

3.使用

  3.1在使用RabitMQ以前,先对几个概念作一下说明

  

  RabbitMQ是一个消息代理。他从消息生产者(producers)那里接收消息,而后把消息送给消息消费者(consumer)在发送和接受之间,他可以根据设置的规则进行路由,缓存和持久化。

  通常提到RabbitMQ和消息,都用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。咱们通常用"P"来表示:

       producer

  • 队列(queue)就是邮箱的名称。消息经过你的应用程序和RabbitMQ进行传输,它们只能存储在队列(queue)中。 队列(queue)容量没有限制,你要存储多少消息均可以——基本上是一个无限的缓冲区。多个生产者(producers)可以把消息发送给同一个队列,一样,多个消费者(consumers)也能从同一个队列(queue)中获取数据。队列能够画成这样(图上是队列的名称):

     queue

  • 消费(Consuming)和获取消息是同样的意思。一个消费者(consumer)就是一个等待获取消息的程序。咱们把它画做"C":

     consumer

  一般,消息生产者,消息消费者和消息代理不在同一台机器上。

3.2 Hello Word

  下面来展现简单的RabbitMQ的使用:


      rabbitmq hello world

 3.2.1 首先建立名为ProjectSend的控制台项目,须要引用RabbitMQ.Client.dll。这个程序做为Producer生产者,用来发送数据:

 
static void Main(string[] args)
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);//建立一个名称为hello的消息队列
                string message = "Hello World"; //传递的消息内容
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "hello", null, body); //开始传递
                Console.WriteLine("已发送: {0}", message);
          Console.ReadLine(); } } }
 

  

  首先,须要建立一个ConnectionFactory,设置目标,因为是在本机,因此设置为localhost,若是RabbitMQ不在本机,只须要设置目标机器的IP地址或者机器名称便可,而后设置前面建立的用户名和密码。

  紧接着要建立一个Channel,若是要发送消息,须要建立一个队列,而后将消息发布到这个队列中。在建立队列的时候,只有RabbitMQ上该队列不存在,才会去建立。消息是以二进制数组的形式传输的,因此若是消息是实体对象的话,须要序列化和而后转化为二进制数组。

  如今客户端发送代码已经写好了,运行以后,消息会发布到RabbitMQ的消息队列中,如今须要编写服务端的代码链接到RabbitMQ上去获取这些消息。

3.2.2建立名为ProjectReceive的控制台项目,引用RabbitMQ.Client.dll。做为Consumer消费者,用来接收数据:

 
static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume("hello", false, consumer);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body); 
                        Console.WriteLine("已接收: {0}", message);   
                    };
                    Console.ReadLine(); 
                }
            }
        }
 

   和发送同样,首先须要定义链接,而后声明消息队列。要接收消息,须要定义一个Consume,而后在接收消息的事件中处理数据。

 3.2.3 如今发送和接收的客户端都写好了,让咱们编译执行起来

  发送消息:

  如今,名为hello的消息队列中,发送了一条消息。这条消息存储到了RabbitMQ的服务器上了。使用rabbitmqctl 的list_queues能够查看全部的消息队列,以及里面的消息个数,能够看到,目前Rabbitmq上只有一个消息队列,里面只有一条消息:

  也能够在web管理界面查看此queue的相关信息:

 

 

  接收消息:

   既然消息已经被接收了,那咱们再来看queue的内容:

  可见,消息中的内容在接收以后已被删除了。

3.3 工做队列

  前面的例子展现了如何在指定的消息队列发送和接收消息。

  如今咱们建立一个工做队列(work queue)来将一些耗时的任务分发给多个工做者(workers):

   rabbitmq-work queue

  工做队列(work queues, 又称任务队列Task Queues)的主要思想是为了不当即执行并等待一些占用大量资源、时间的操做完成。而是把任务(Task)看成消息发送到队列中,稍后处理。一个运行在后台的工做者(worker)进程就会取出任务而后处理。当运行多个工做者(workers)时,任务会在它们之间共享。

  这个在网络应用中很是有用,它能够在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不过高的地方,咱们能够处理完主要操做以后,以消息的方式来处理其余的不紧要的操做,好比写日志等等。

准备

  在第一部分,发送了一个包含“Hello World!”的字符串消息。如今发送一些字符串,把这些字符串看成复杂的任务。这里使用time.sleep()函数来模拟耗时的任务。在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。好比"Hello..."就会耗时3秒钟。

对以前示例的send.cs作些简单的调整,以即可以发送随意的消息。这个程序会按照计划发送任务到咱们的工做队列中。

 
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
 

 

接着咱们修改接收端,让他根据消息中的逗点的个数来Sleep对应的秒数:

 
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}
 

 

轮询分发

  使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。

如今,咱们先启动两个接收端,等待接受消息,而后启动一个发送端开始发送消息。

Send message queue 

  在cmd条件下,发送了5条消息,每条消息后面的逗点表示该消息须要执行的时长,来模拟耗时的操做。

  而后能够看到,两个接收端依次接收到了发出的消息:

receive message queue 

默认,RabbitMQ会将每一个消息按照顺序依次分发给下一个消费者。因此每一个消费者接收到的消息个数大体是平均的。 这种消息分发的方式称之为轮询(round-robin)。

3.4 消息响应

当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(consumers)以后,立刻就会将该消息从队列中移除。此时,若是把处理这个消息的工做者(worker)停掉,正在处理的这条消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。

咱们不想丢失任何任务消息。若是一个工做者(worker)挂掉了,咱们但愿该消息会从新发送给其余的工做者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ才会释放并删除这条消息。

若是消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其余消费者(consumer)。这样,即便工做者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。

消息响应默认是开启的。在以前的例子中使用了no_ack=True标识把它关闭。是时候移除这个标识了,当工做者(worker)完成了任务,就发送一个响应。

 
channel.BasicConsume("hello", false, consumer);

while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    channel.BasicAck(ea.DeliveryTag, false);
}
 

 

如今,能够保证,即便正在处理消息的工做者被停掉,这些消息也不会丢失,全部没有被应答的消息会被从新发送给其余工做者.

一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,可是后果很严重. 当客户端退出时,待处理的消息就会被从新分发,可是RabitMQ会消耗愈来愈多的内存,由于这些没有被应答的消息不可以被释放。调试这种case,可使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

3.5 消息持久化

前面已经搞定了即便消费者down掉,任务也不会丢失,可是,若是RabbitMQ Server停掉了,那么这些消息仍是会丢失。

当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。若是要让RabbitMQ保存住消息,须要在两个地方同时设置:须要保证队列和消息都是持久化的。

首先,要保证RabbitMQ不会丢失队列,因此要作以下设置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

 

虽然在语法上是正确的,可是在目前阶段是不正确的,由于咱们以前已经定义了一个非持久化的hello队列。RabbitMQ不容许咱们使用不一样的参数从新定义一个已经存在的同名队列,若是这样作就会报错。如今,定义另一个不一样名称的队列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

 

queueDeclare 这个改动须要在发送端和接收端同时设置。

如今保证了task_queue这个消息队列即便在RabbitMQ Server重启以后,队列也不会丢失。 而后须要保证消息也是持久化的, 这能够经过设置IBasicProperties.SetPersistent 为true来实现:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

 

须要注意的是,将消息设置为持久化并不能彻底保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,可是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并无将其写入到磁盘上。持久化是不可以必定保证的,可是对于一个简单任务队列来讲已经足够。若是须要消息队列持久化的强保证,可使用publisher confirms

3.6 公平分发

你可能会注意到,消息的分发可能并无如咱们想要的那样公平分配。好比,对于两个工做者。当奇数个消息的任务比较重,可是偶数个消息任务比较轻时,奇数个工做者始终处理忙碌状态,而偶数个工做者始终处理空闲状态。可是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。

为了改变这一状态,咱们可使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工做者发送多于1个的消息,或者换句话说。在一个工做者还在处理消息,而且没有响应消息以前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工做者。

channel.BasicQos(0, 1, false); 

 

3.7 完整实例

如今将全部这些放在一块儿:

发送端代码以下:

 
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_queue", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
 

 

接收端代码以下:

 
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_queue", false, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}
 

 

4 管理界面

RabbitMQ管理界面,经过该界面能够查看RabbitMQ Server 当前的状态,该界面是以插件形式提供的,而且在安装RabbitMQ的时候已经自带了该插件。须要作的是在RabbitMQ控制台界面中启用该插件,命令以下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

如今,在浏览器中输入 http://server-name:15672/ server-name换成机器地址或者域名,若是是本地的,直接用localhost(RabbitMQ 3.0以前版本端口号为55672)在输入以后,弹出登陆界面,使用咱们以前建立的用户登陆。

RabbitMQ Web management .

在该界面上能够看到当前RabbitMQServer的全部状态。

5 总结

本文简单介绍了消息队列的相关概念,并介绍了RabbitMQ消息代理的基本原理以及在Windows 上如何安装RabbitMQ和在.NET中如何使用RabbitMQ。消息队列在构建分布式系统和提升系统的可扩展性和响应性方面有着很重要的做用,但愿本文对您了解消息队列以及如何使用RabbitMQ有所帮助。

相关文章
相关标签/搜索