.NET Core 使用RabbitMQ

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。算法

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。centos

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。安全

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。服务器

1.首先安装erlang异步

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm分布式

2.而后安装socat学习

yum install socatui

3.最后安装RabbitMQ.net

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm设计

RabbitMQ经常使用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

中止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户必定要记得配置权限。

.NET Core 使用RabbitMQ

经过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者
//建立链接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "admin",//用户名
    Password = "admin",//密码
    HostName = "192.168.157.130"//rabbitmq ip
};

//建立链接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ链接成功,请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();
定义消费者
//建立链接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "admin",//用户名
                Password = "admin",//密码
                HostName = "192.168.157.130"//rabbitmq ip
            };

            //建立链接
            var connection = factory.CreateConnection();
            //建立通道
            var channel = connection.CreateModel();

            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume("hello", false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();
运行

启动了一个生产者,两个消费者,能够看见两个消费者都能收到消息,消息投递到哪一个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息以后,须要发送一个应答,而后RabbitMQ才会将这个消息从队列中删除,若是消费者在消费过程当中出现异常,断开链接切没有发送应答,那么RabbitMQ会将这个消息从新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);

    Console.WriteLine($"收到消息: {message}");

    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
    Thread.Sleep(10000);
    //确认该消息已被消费
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

演示:

从图中能够看出,设置了消息应答延迟10s,若是在这10s中,该消费者断开了链接,那么消息会被RabbitMQ从新投递。

使用RabbitMQ的Exchange

前面咱们能够看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者一般不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,而后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就相似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。可是 header模式在实际使用中较少,因此这里只介绍前三种模式。

Exchange不是消费者关心的,因此消费者的代码彻底不用变,用上面的消费者就好了。
因为避免文章过长,影响阅读,因此只贴了部分代码,可是demo里面是完整可运行的,详细代码请查看demo。

Direct Exchange

全部发送到Direct Exchange的消息被转发到具备指定RouteKey的Queue。

Direct模式,可使用rabbitMQ自带的Exchange:default Exchange 。因此不须要将Exchange进行任何绑定(binding)操做 。消息传递时,RouteKey必须彻底匹配,才会被队列接收,不然该消息会被抛弃。

//建立链接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

Fanout Exchange

全部发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的全部Queue上。

Fanout Exchange 不须要处理RouteKey 。只须要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的全部队列上。相似子网广播,每台子网内的主机都得到了一份复制的消息。

因此,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每一个队列都拥有一个消费者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //建立链接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //建立链接
    var connection = factory.CreateConnection();
    //建立通道
    var channel = connection.CreateModel();

    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定义队列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定义队列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //将队列绑定到交换机
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成两个队列的消费者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"\nRabbitMQ链接成功,\n\n请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

/// <summary>
/// 根据队列名称生成消费者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
    //建立链接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //建立链接
    var connection = factory.CreateConnection();
    //建立通道
    var channel = connection.CreateModel();

    //事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);

        Console.WriteLine($"Queue:{queueName}收到消息: {message}");
        //确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    //启动消费者 设置为手动应答消息
    channel.BasicConsume(queueName, false, consumer);
    Console.WriteLine($"Queue:{queueName},消费者已启动");
}

运行:

Topic Exchange

全部发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词。所以“XiaoChen.#”可以匹配到“XiaoChen.pets.cat”,可是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

因此,Topic Exchange 使用很是灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";

//建立链接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "admin",//用户名
    Password = "admin",//密码
    HostName = "192.168.157.130"//rabbitmq ip
};

//建立链接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);



Console.WriteLine($"\nRabbitMQ链接成功,\n\n请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

运行

Demo下载:DotNetCore.RabbitMQ

最后:欢迎加入 .net core 交流群一块儿学习,群号:4656606 加入QQ群

相关文章
相关标签/搜索