C#队列学习笔记:RabbitMQ安装及使用

原文: C#队列学习笔记:RabbitMQ安装及使用

    1、环境搭建

    1.一、因为RabbitMQ是使用Erlang语言开发的,所以要安装Erlang运行时环境,下载地址:Erlang官网下载  CSDN分享下载html

    1.二、去RabbitMQ官网下载RabbitMQ Server服务端程序,选择合适的平台版本下载并安装。数组

    RabbitMQ安装时,会自动在Windows服务中建立RabbitMQ服务,并自动启动。浏览器

    1.三、开始->全部程序->RabbitMQ Server->RabbitMQ Command Prompt (sbin dir):缓存

    运行RabbitMQ Command Prompt与cmd下cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin的效果是同样的。分布式

    1.3.一、sbin目录下的rabbitmqctl.bat,是用来查看和控制服务端状态的。运行rabbitmqctl status检查RabbitMQ状态:ide

    1.3.三、RabbitMQ Server上面也有用户概念,使用rabbitmqctl list_users命令,能够看到目前的用户:学习

    能够看到,如今只有一个名为gues角色为administratort的用户,这个是RabbitMQ默认为咱们建立的,它有RabbitMQ的全部权限。通常状况下,咱们须要新建一个本身的用户,并设置密码及授予权限,同时设置为管理员。操做方法以下:fetch

rabbitmqctl add_user hello world rabbitmqctl set_permissions hello ".*" ".*" ".*" rabbitmqctl set_user_tags hello administrator

    上面的第一命令添加了一个名为hello的用户并设置了密码world;第二条命令为用户hello分别授予对全部消息队列的配置、读和写的权限;第三条命令将用户hello设置为管理员。ui

    如今咱们能够将默认的guest用户删掉,使用下面的命令便可:atom

rabbitmqctl delete_user guest

    若是要修改密码,可使用下面的命令:

rabbitmqctl change_password {username} {newpassowrd}

   2、管理界面

    RabbitMQ还有一个管理界面,是以插件形式提供的,经过该界面能够查看RabbitMQ Server当前的状态。启用命令以下: 

rabbitmq-plugins enable rabbitmq_management

    如今,在浏览器中输入 http://server-name:15672/ 便可。

    注:server-name为计算机名或IP地址,若是是本地的,直接用localhost便可。登陆界面,使用咱们以前建立的hello用户登陆。

    3、开始使用

    在.NET中使用RabbitMQ须要下载RabbitMQ客户端程序集,下载解压后在bin下找到RabbitMQ.Client.dll,并添加引用到项目中。

    3.一、Hello World

    为了展现RabbitMQ的基本使用,咱们发送一个HelloWorld消息,而后接收并处理。

rabbitmq hello world

    3.1.一、建立一个名为Send的客户端控制台程序,用来将消息发送到RabbitMQ消息队列中,代码以下:

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message); //消息是以二进制数组的形式传输的 //6.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs

    3.1.二、建立一个名为Receive的服务端控制台程序,用来接收RabbitMQ消息队列中的消息,代码以下:

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Received {message}"); }; //7.启动消费者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    3.1.三、先运行消息接收端,再运行消息发送端,结果以下:

    从上面的代码中能够看出,发送端和接收端的代码前4步都是同样的。主要的区别在于发送端调用channel.BasicPublish方法发送消息,而接收端须要实例化一个EventingBasicConsumer实例来进行消息处理。另一点须要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。

    3.二、工做队列

    工做队列(work queues,又称Task Queues)的主要思想是:为了不当即执行一些实时性要求不高可是比较耗资源或时间的操做(如写日志),把任务看成消息发送到队列中,由一个运行在后台的工做者(worker)进程取出并处理。当有多个工做者(workers)运行时,任务会在它们之间共享。

    如今发送一些字符串来模拟耗时的任务,在字符串中加上点号(.)来表示任务的复杂程度。一个点号将会耗时1秒钟,好比"Hello World..."就会耗时3秒钟。

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.构建byte消息数据包
                    string message = args.Length > 0 ? string.Join(" ", args) : "Hello World..."; var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//设置消息是否持久化 1:非持久化 2:持久化
                    var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); }; //7.启动消费者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    3.3轮询分发

    使用工做队列的一个好处就是它可以并行的处理队列。若是堆积了不少任务,咱们只须要添加更多的工做者(workers)就能够了,扩展很简单。

    如今,咱们先启动两个接收端,等待接受消息,而后启动一个发送端开始发送消息(cmd->send.exe所在的目录)。

    上面发了10条信息,两个接收端各收到5条信息。

    默认状况下,RabbitMQ会将每一个消息按照顺序依次分发给下一个消费者,因此每一个消费者接收到的消息个数大体是平均的。 这种消息分发的方式称之为轮询(round-robin)。

    3.四、消息响应

    当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者以后,立刻就会将该消息从队列中移除。此时,若是把处理这个消息的工做者(worker)停掉,正在处理的这条消息就会丢失。同时,全部发送到这个工做者的尚未处理的消息都会丢失。

    咱们不想丢失任何任务消息,若是一个工做者挂掉了,咱们但愿该消息可以从新发送给其它的工做者。

    为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会经过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,而后RabbitMQ才会释放并删除这条消息。若是消费者挂掉了,没有发送响应,RabbitMQ就会认为消息没有被彻底处理,而后从新发送给其它消费者。这样,即便工做者偶尔的挂掉,也不会丢失消息。

    消息是没有超时这个概念的。当工做者与它断开连的时候,RabbitMQ会从新发送消息。这样在处理一个耗时很是长的消息任务的时候就不会出问题了。

    消息响应默认是开启的。在以前的例子中使用了no_Ack=true标识把它关闭。是时候移除这个标识了,当工做者完成了任务,就会发送一个响应。

    下面修改Receive.cs,主要改动的是:将 autoAck:true修改成autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,可是后果很严重。当客户端退出时,待处理的消息就会被从新分发,可是RabitMQ会消耗愈来愈多的内存,由于这些没有被应答的消息不可以被释放。调试这种case,可使用rabbitmqctl打印messages_unacknowledged字段。

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin>rabbitmqctl list_queues name messages_ready messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged hello 1 0

    3.五、消息持久化

    消息确认确保了即便消费端异常,消息也不会丢失可以被从新分发处理。可是若是RabbitMQ服务端异常,消息依然会丢失。除非咱们指定durable:true,不然当RabbitMQ退出或崩溃时,消息将依然会丢失。经过指定durable:true(队列),并指定Persistent=true(消息),来告知RabbitMQ将消息持久化。一句话归纳:须要保证队列和消息都是持久化的。

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    将消息标记为持久性不能彻底保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,可是当RabbitMQ接受消息而且尚未保存时​​,仍然有一个很短的时间窗口。RabbitMQ可能只是将消息保存到了缓存中,并无将其写入到磁盘上。持久化不是必定可以保证的,可是对于一个简单任务队列来讲已经足够。

    若是须要确保消息队列的持久化,可使用publisher confirms

    3.六、公平分发

    你可能会注意到,消息的分发可能并无如咱们想要的那样公平分配。好比,对于两个工做者。当奇数个消息的任务比较重可是偶数个消息任务比较轻时,奇数个工做者始终处于忙碌状态,而偶数个工做者始终处于空闲状态,可是RabbitMQ并不知道这些,它仍然会平均依次地分发消息。

    为了改变这一状态,咱们可使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工做者发送多于1个的消息。换句话说,在一个工做者还在处理消息而且没有响应消息以前,不要给它分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的工做者。

复制代码
//Receive.cs //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) //channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("hello", true, false, false, null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息。 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.BasicQos(0, 1, false);
复制代码

    3.7完整实例

class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4、Exchange

    上面的示例,生产者和消费者直接是经过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者建立消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。

    那消费者如何才能发送消息到多个消息队列呢?

    RabbitMQ提供了Exchange,它相似于路由器的功能,对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另外一方面将消息推送到队列。可是Exchange是如何知道将消息附加到哪一个队列或者直接忽略的呢?这些实际上是由Exchange Type来定义的。关于Exchange的图文介绍,请看上一篇《C#队列学习笔记:RabbitMQ基础知识》,此处仅提供示例代码。

    4.一、fanout

class Program { static void Main(string[] args) { #region fanout exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;fanout类型无需指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region fanout exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定fanout类型exchange,fanout类型无需指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: ""); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4.二、direct

class Program { static void Main(string[] args) { #region direct exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;direct类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region direct exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定direct类型exchange,direct类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4.三、topic

class Program { static void Main(string[] args) { #region topic exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;topic类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region topic exchange type
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定topic类型exchange,topic类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    5、RPC

    RPC--Remote Procedure Call,远程过程调用。RabbitMQ是如何进行远程调用的呢?示意图以下:

    第一步:主要是进行远程调用的客户端须要指定接收远程回调的队列,并声明消费者监听此队列。

    第二步:远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听结果的队列中去。

class Program { static void Main(string[] args) { #region rpc
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.创建信道
                using (var channel = connection.CreateModel()) { //4.声明惟一guid用来标识这次发送的远程调用请求
                    var correlationId = Guid.NewGuid().ToString(); //5.声明须要监听的回调队列
                    var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.Persistent = true;//将消息标记为持久性
                    properties.ReplyTo = replyQueue;//指定回调队列
                    properties.CorrelationId = correlationId;//指定消息惟一标识 //6.构建byte消息数据包
                    string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //7.发送数据包
                    channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"Request fib({number})"); //8.建立消费者用于处理消息回调(远程调用返回结果)
                    var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, noAck: false, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
                        if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"{responseMsg}"); } }; Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region rpc
            //1.实例化链接工厂
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.创建链接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.声明队列接收远程调用请求
                    channel.QueueDeclare(queue: "rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("Waiting for message."); //5.请求处理逻辑
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //6.从请求的参数中获取请求的惟一标识,在消息回传时一样绑定。
                        var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //7.将远程调用结果发送到客户端监听的队列上
                        channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //8.手动发回消息确认
                        channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.Read(); } } int Fib (int n) { if (n <= 2) return 1; else
                    return Fib(n - 1) + Fib(n - 2); } #endregion } }
Receive.cs

    6、总结

    本文介绍了RabbitMQ消息代理在Windows上的安装以及在.NET中的使用。消息队列在构建分布式系统、提升系统的可扩展性及响应性方面,有着很重要的做用。

 

    参考自:

    https://www.cnblogs.com/yangecnu/p/Introduce-RabbitMQ.html#!comments

    http://www.javashuo.com/article/p-ntcqqvey-hs.html

相关文章
相关标签/搜索