RabbitMQ实例C#

驱动组件.NET版本

官网推荐驱动:RabbitMQ.Clienthtml

https://www.rabbitmq.com/devtools.html#dotnet-devjava

Connection和Channel

Connection是一个TCP链接,通常服务器这种资源都是很宝贵的,因此提供了Channel,完成消息的发布消费。这样Connection就能够作成单例模式的。服务器

一、事件负载均衡

Connection和Channel里面包含了几个事件。分别在不一样的状况下触发异步

 

 其余时间执行发生异常,就会执行这个ide

Connection.CallbackException性能

恢复链接成功fetch

Connection.RecoverySucceededui

链接恢复异常时会触发这个事件spa

Connection.ConnectionRecoveryError

RabbitMQ出于自身保护策略,经过阻塞方式限制写入,致使了生产者应用“假死”,不对外服务。比若说CPU  IO RAM降低,队列堆积,致使堵塞。  就会触发这个事件

Connection.ConnectionBlocked

阻塞解除会触发这个事件

Connection.ConnectionUnblocked

connection断开链接时候

Connection.ConnectionShutdown

-------------------------------------------------------------------------------------------------------------------------------------------------------

.NET RabbitMQ.Client中Channel叫作Model

 

channel断开链接时候触发

Channel.ModelShutdown

其余时间执行发生异常,就会执行这个

Channel.CallbackException

broker 发现当前消息没法被路由到指定的 queues 中(若是设置了 mandatory 属性,则 broker 会先发送 basic.return)

Channel.BasicReturn

Channel.BasicRecoverOk

Signalled when a Basic.Nack command arrives from the broker.

Channel.BasicNacks

Signalled when a Basic.Ack command arrives from the broker.

Channel.BasicAcks

Channel.FlowControl

二、属性

最大channel数量

connetion.ChannelMax

服务上这个链接的对象属性

connetion.ClientProperties

服务器上这个链接的名字

connetion.ClientProvidedName

关闭缘由

connetion.CloseReason

端口

connetion.Endpoint

和客户端通讯时所容许的最大的frame size

connetion.FrameMax

链接的心跳包

connetion.Heartbeat

是否打开

connetion.IsOpen

获取vhost

connetion.KnownHosts

本地端口

connetion.LocalPort

链接串使用的协议

connetion.Protocol

远程端口,服务器

connetion.RemotePort

服务器属性

connetion.ServerProperties

关停信息

connetion.ShutdownReport

----------------------------------------------------------------------------------------------------------------------------------------

channel编号

channel.ChannelNumber

关闭缘由

channel.CloseReason

链接超时时间

channel.ContinuationTimeout

channel.IsClosed

channel.IsOpen

下一个消息编号

channel.NextPublishSeqNo

三、方法

终止链接以及他们的channel,能够指定时间长度。

connetion.Abort()

关闭链接以及他的channel

connetion.Close()

建立channel

connetion.CreateModel()

connetion.HandleConnectionBlocked()

connetion.HandleConnectionUnblocked()

 

发送消息Confirm模式

目的确认消息是否到达消息队列中 

 

一、mandatory

 broker 发现当前消息没法被路由到指定的 queues 中(若是设置了 mandatory 属性,则 broker 会先发送 basic.return)

channel.BasicReturn += Channel_BasicReturn;
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

 

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
{
    Console.WriteLine("Channel_BasicReturn");
}

 

二、普通Confirm模式

channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body);

if (channel.WaitForConfirms())
{
    Console.WriteLine("普通发送方确认模式");
}

 

三、批量Confirm模式

channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

channel.WaitForConfirmsOrDie();
Console.WriteLine("普通发送方确认模式");

 

 

四、异步Confirm模式

java版本组件有

五、事物

 

try
{
    //声明事物
    channel.TxSelect();
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    //提交事物
    channel.TxCommit();
}
catch (Exception)
{
    //回滚
    channel.TxRollback();

}

上面说的是生产者发布消息确认,那么消费者消费如何确认呢,你们都知道消费者有ack机制,可是用到事物的时候,是怎样的呢

1.autoAck=false手动应对的时候是支持事务的,也就是说即便你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决以后,在作决定是确认消息仍是从新放回队列,若是你手动确认如今以后,又回滚了事务,那么已事务回滚为主,此条消息会从新放回队列;
2.autoAck=true若是自定确认为true的状况是不支持事务的,也就是说你即便在收到消息以后在回滚事务也是于事无补的,队列已经把消息移除了;

 

事物比较耗性能

 

简单消息发送
static void Main(string[] args)
        {


            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.140.161";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "TestVHost";
            //建立connetion
            using (var connetion = factory.CreateConnection())
            {
                connetion.CallbackException += Connetion_CallbackException;
                connetion.RecoverySucceeded += Connetion_RecoverySucceeded;
                connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError;
                connetion.ConnectionBlocked += Connetion_ConnectionBlocked;
                connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked;
                //链接关闭的时候
                connetion.ConnectionShutdown += Connetion_ConnectionShutdown;



                //建立channel
                using (var channel = connetion.CreateModel())
                {

                    //消息会在什么时候被 confirm?
                    //The broker will confirm messages once:
                    //broker 将在下面的状况中对消息进行 confirm :
                    //it decides a message will not be routed to queues
                    //(if the mandatory flag is set then the basic.return is sent first) or
                    //broker 发现当前消息没法被路由到指定的 queues 中(若是设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    //a transient message has reached all its queues(and mirrors) or
                    //非持久属性的消息到达了其所应该到达的全部 queue 中(和镜像 queue 中)
                    //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or
                    //持久消息到达了其所应该到达的全部 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
                    //a persistent message has been consumed(and if necessary acknowledged) from all its queues
                    //持久消息从其所在的全部 queue 中被 consume 了(若是必要则会被 acknowledge)


                    //broker 发现当前消息没法被路由到指定的 queues 中(若是设置了 mandatory 属性,则 broker 会先发送 basic.return)
                    channel.BasicReturn += Channel_BasicReturn;

                    //(能够不声明)若是不声明交换机 ,那么就使用默认的交换机  (每个vhost都会有一个默认交换机)
                    //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true);

                    //建立一个队列  bool durable(持久化), bool exclusive(专有的), bool autoDelete(自动删除)
                    //channel.QueueDeclare("TestQueue", true, false, false, null);
                    //不作绑定的话,使用默认的交换机。
                    //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null);

                    //发布消息
                    var body = Encoding.UTF8.GetBytes("西伯利亚的狼");


                    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
                }

                Console.WriteLine("Hello World!");
                Console.ReadKey();
            }
        }

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            Console.WriteLine("Channel_BasicReturn");
        }

private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionShutdown");
        }

private static void Connetion_ConnectionUnblocked(object sender, EventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionUnblocked");
        }

private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionBlocked");
        }

private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionRecoveryError");
        }

private static void Connetion_RecoverySucceeded(object sender, EventArgs e)
{
    Console.WriteLine("Connetion_RecoverySucceeded");
}

private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
{
    Console.WriteLine("Connetion_CallbackException");
}

 

 

场景分析

 

 

消息持久化

Broker持久化、交换机持久化、队列持久化  。目的是维持重启后  这些东西的存在。

消息持久化,才是把消息持久化到硬盘中,由于消息在队列中,因此须要队列持久化。

设置消息持久化,须要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).  默认的就是持久化。

//发布消息
var body = Encoding.UTF8.GetBytes("西伯利亚的狼");
BasicProperties pro = new BasicProperties();
pro.DeliveryMode = 2;

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);

 

消费者消费消息

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息而且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就能够删除掉这条已经处理的消息任务。但若是消费端挂掉了(好比,通道关闭、链接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会从新将消息入队,若是有另一个消费端在线,就会快速的从新发送到另一个消费端。

RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会从新分发消息。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "192.168.140.161";
factory.Port = 5672;
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "TestVHost";
//建立connetion
using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
            // 主要改动的是将 autoAck:true修改成autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //构造消费者实例
        var consumer = new EventingBasicConsumer(channel);
        //绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模拟耗时
            Console.WriteLine(" [x] Done");
        };
        //启动消费者
        channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

 

 

消费负载均衡

一、当一个队列有多个消费者时,队列会以循环(round-robin)的方式发送给消费者。每条消息只会给一个订阅的消费者。

二、默认状况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每一个消费者将得到相同数量的消息。这种分发消息的方式叫作循环(round-robin)。

三、RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙状况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理通常任务处于空置状态,而只是它们分配的任务数量同样。

但咱们能够经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,再也不分发消息,也就确保了当消费端处于忙碌状态时,再也不分配任务。

分库分表模式

好比说客户积分同步。

通常电商中这种数据量比较大,及时性比较高。

ID 编号 1-10000的用户积分表更放在队列1,10001-20000放在队列2,不一样的消费者消费不一样队列。以此类推...

RPC

第一步,主要是进行远程调用的客户端须要指定接收远程回调的队列,并申明消费者监听此队列。
第二步,远程调用的服务端除了要申明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。

远程调用客户端:

//申明惟一guid用来标识这次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明须要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息惟一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //建立消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

远程调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的惟一标识,在消息回传时一样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
相关文章
相关标签/搜索