(1)RabbitMQ简介与安装

1.RabbitMQ简介

由于RabbitMQ是基于开源的AMQP协议来实现的,因此在了解MQ时候,首先咱们来了解下AMQP协议。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端或者中间件不一样产品、不一样的开发语言等条件的限制,也就是说消息生产者无需知道消费者如何处理消息结果,反之亦然,解耦了组件跟组件依赖。RabbitMQ服务器端用Erlang语言编写,同时也支持多种客户端来开发跨语言消息传递,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。RabbitMQ还支持多种消息传递协议、消息排队、传递确认、队列的灵活路由、多种交换类型。还支持分布式集群以实现高可用性和吞吐量。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。还能够经过HTTP-API命令行工具和用于管理和监视RabbitMQ的UI。html

2.RabbitMQ在CentOS 7安装

由于我对Linux运维知识面比较薄弱,因此在Linux上部署RabbitMQ这块暂时不想耗太多时间在这上面去(后续有时间再深刻了解),这里我彻底是跟着园区Net大神晓晨大佬这篇文章(https://www.cnblogs.com/stulzq/p/7551819.html)去部署的。网上也有不少RabbitMQ在Linux部署文章参考,你们也能够自行度娘。算法

//安装服务端erlang语言
rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
//安装socat
yum install socat
//安装服务端RabbitMQ
rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

3.RabbitMQ服务端经常使用命令

//启用Web管理平台
rabbitmq-plugins enable rabbitmq_management
//开启服务
systemctl start rabbitmq-server.service
//中止服务
systemctl stop rabbitmq-server.service
//查看服务状态
systemctl status rabbitmq-server.service
//查看RabbitMQ状态
rabbitmqctl status
//添加用户赋予管理员权限
rabbitmqctl  add_user  tom  12345
rabbitmqctl  set_user_tags  tom  administrator
//查看用户列表
rabbitmqctl list_users
//删除用户
rabbitmqctl delete_user username
//修改用户密码
rabbitmqctl oldPassword Username newPassword

4.访问RabbitMQ Web管理平台

当启用RabbitMQ Web管理平台,咱们根据部署CentOS 7系统的IP在浏览器上打开http://IP:15672,若是新增了用户,必定要设置新增用户的VirtualHost的权限,否则客户端调用RabbitMQ时候会报错!具体处理方法以下截图:
//未设置权限时


点解设置权限便可。
若是访问显示404,则是防火墙把通信给过滤掉了,请执行命令把防火墙关闭掉再打开,如下我列出全部CentOS 7关于防火墙命令:centos

//查看防火状态
systemctl status firewalld
//暂时关闭防火墙
systemctl stop firewalld
//永久关闭防火墙
systemctl disable firewalld
//重启防火墙
systemctl enable firewalld
//永久关闭后重启
chkconfig iptables on 

关闭防火墙以后,在浏览器上就会看到下面管理平台界面:
浏览器

5.NET Core使用RabbitMQ

经过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/服务器

5.1定义生产者

class Program
{
    static void Main(string[] args)
    {
        string queueName = "DirectExchangeQueueName";
        string routeKey = "DirectExchangeQueueName";
        //建立链接工厂
        var factory = new ConnectionFactory
        {
            UserName = "dengwu",//用户名
            Password = "123456",//密码
            HostName = "192.168.112.133",//rabbitmq ip
        };

        //建立链接
        var connection = factory.CreateConnection();
        //建立通道
        var channel = connection.CreateModel();
        //声明一个队列
        channel.QueueDeclare(queueName, false, false, false, null);

        Console.WriteLine("\nRabbitMQ链接成功,请输入消息,输入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            var sendBytes = Encoding.UTF8.GetBytes(input);
            //发布消息
            channel.BasicPublish("", routeKey, null, sendBytes);

        } while (input.Trim().ToLower() != "exit");
        channel.Close();
        connection.Close();
    }
}

5.2定义消费者

class Program
{
    static void Main(string[] args)
    {
        string queueName = "DirectExchangeQueueName";
        //建立链接工厂
        var factory = new ConnectionFactory
        {
            UserName = "dengwu",//用户名
            Password = "123456",//密码
            HostName = "192.168.112.133",//rabbitmq ip
        };

        //建立链接
        var connection = factory.CreateConnection();
        //建立通道
        var channel = connection.CreateModel();
        //事件基本消费者
        var consumer = new EventingBasicConsumer(channel);

        //接收到消息事件
        consumer.Received += (ch, ea) =>
        {
            var boby = ea.Body;
            var message = Encoding.UTF8.GetString(boby.ToArray());

            Console.WriteLine($"收到消息: {message}");
            //确认该消息已被消费
            channel.BasicAck(ea.DeliveryTag, false);
            //Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
            //Thread.Sleep(10000);
            //Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
        };
        //启动消费者 设置为手动应答消息
        channel.BasicConsume(queueName, false, consumer);
        Console.WriteLine("消费者已启动");
        Console.ReadKey();
        channel.Dispose();
        connection.Close();
    }
}

运行:

经过启动一个生产者,一个消费者,咱们能够看到,生产者经过RabbitMQ决定投递消息给对应消费者。运维

5.3RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息以后,须要发送一个应答,而后RabbitMQ才会将这个消息从队列中删除,若是消费者在消费过程当中出现异常,断开链接没有发送应答,那么RabbitMQ会将这个消息从新投递,下面咱们将消费者接收到消息事件代码修改以下:异步

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var boby = ea.Body;
    var message = Encoding.UTF8.GetString(boby.ToArray());

    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
    Thread.Sleep(10000);
    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

先在生产者里面预先传递三个消息:


若是咱们设置了消息应答延迟10s,若是在这10s中,该消费者断开了链接,那么消息会被RabbitMQ从新投递的。具体你们能够自行测试。分布式

6.总结

该章节主要简单介绍RabbitMQ概念在Linux上简单部署,接下来章节,我会陆续介绍AMQP Messaging中的基本概念跟Exchange(交换机)。

参考文献:
RabbitMQ官网
.NET Core 使用RabbitMQ工具

相关文章
相关标签/搜索