RabbitMQ实战经验分享

前言

最近在忙一个高考项目,看着系统顺利完成了此次高考,终于能够松口气了。看到那些即将参加高考的学生,也想起当年高三的本身。html

下面分享下RabbitMQ实战经验,但愿对你们有所帮助:git


 

 

1、生产消息

关于RabbitMQ的基础使用,这里再也不介绍了,项目中使用的是Exchange中的topic模式。github

先上发消息的代码性能

private bool MarkErrorSend(string[] lstMsg)
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    UserName = "guest",//用户名
                    Password = "guest",//密码
                    HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
                };
                //建立链接
                var connection = factory.CreateConnection();
                //建立通道
                var channel = connection.CreateModel();
                try
                {
                    //定义一个Direct类型交换机
                    channel.ExchangeDeclare(
                        exchange: "TestTopicChange", //exchange名称
                        type: ExchangeType.Topic, //Topic模式,采用路由匹配
                        durable: true,//exchange持久化
                        autoDelete: false,//是否自动删除,通常设成false
                        arguments: null//一些结构化参数,好比:alternate-exchange
                        );

                    //定义测试队列
                    channel.QueueDeclare(
                        queue: "Test_Queue", //队列名称
                        durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效)
                        exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除
                        autoDelete: false,//是否自动删除,通常设成false
                        arguments: null
                        );

                    //将队列绑定到交换机
                    string routeKey = "TestRouteKey.*";//*匹配一个单词
                    channel.QueueBind(
                        queue: "Test_Queue",
                        exchange: "TestTopicChange",
                        routingKey: routeKey,
                        arguments: null
                        );

                    //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一块儿使用才有效)
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;

                    channel.ConfirmSelect();//发送确认机制
                    foreach (var itemMsg in lstMsg)
                    {
                        byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
                        //发布消息
                        channel.BasicPublish(
                            exchange: "TestTopicChange",
                            routingKey: "TestRouteKey.one",
                            basicProperties: properties,
                            body: sendBytes
                            );
                    }
                    bool isAllPublished = channel.WaitForConfirms();//通道(channel)里全部消息均发送才返回true
                    return isAllPublished;
                }
                catch (Exception ex)
                {
                    //写错误日志
                    return false;
                }
                finally
                {
                    channel.Close();
                    connection.Close();
                }
            }
            catch
            {
                //RabbitMQ.Client.Exceptions.BrokerUnreachableException:
                //When the configured hostname was not reachable.
                return false;
            }
        }

        发消息没啥特别的。关于消息持久化的介绍这里也再也不介绍,不懂的能够看上篇文章。发消息须要注意的地方是,能够选择多条消息一块儿发送,最后才肯定消息发送成功,这样效率比较高;此外,须要尽可能精简每条消息的长度(楼主在这里吃过亏),否则会因消息过长从而增长发送时间。在实际项目中一次发了4万多条数据没有出现问题。测试

 


 

2、接收消息

       接下来讲下消费消息的过程,我使用的是单个链接多个channel,每一个channel每次只取一条消息方法。有人会问单个TCP链接,多个channel会不会影响通讯效率。这个理论上确定会有影响的,看影响大不大而已。我开的channel数通常去到30左右,并无以为影响效率,有多是由于我每一个channel是拿一条消息的缘由。经过单个链接多个channel的方法,能够少开了不少链接。至于我为何选每一个channel每次只取一条消息,这是外界因素限制了,具体看本身需求。spa

       接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn链接变量,此外还有建立链接、关闭链接和验证链接是否打开等方法。程序运行一个定时器,当线程

检测到链接未打开的状况下,主动建立链接处理消息。日志

 public class RabbitMQHelper
    {
        public IConnection conn = null;

        /// <summary>
        /// 建立RabbitMQ消息中间件链接
        /// </summary>
        /// <returns>返回链接对象</returns>
        public IConnection RabbitConnection(string sHostName, ushort nChannelMax)
        {
            try
            {
                if (conn == null)
                {
                    var factory = new ConnectionFactory()
                    {
                        UserName = "guest",//用户名
                        Password = "guest",//密码
                        HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],
                        AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测链接是否存在
                        RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒
                        RequestedChannelMax = nChannelMax//与开的线程数保持一致
                    };
                    //建立链接
                    conn = factory.CreateConnection();
                    Console.WriteLine("RabbitMQ链接已建立!");
                }

                return conn;
            }
            catch
            {
                Console.WriteLine("建立链接失败,请检查RabbitMQ是否正常运行!");
                return null;
            }
        }

        /// <summary>
        /// 关闭RabbitMQ链接
        /// </summary>
        public void Close()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        conn.Close();
                    conn = null;
                    Console.WriteLine("RabbitMQ链接已关闭!");
                }
            }
            catch { }
        }

        /// <summary>
        /// 判断RabbitMQ链接是否打开
        /// </summary>
        /// <returns></returns>
        public bool IsOpen()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        return true;
                }
                return false;
            }
            catch
            {
                return false;
            }
        }
    }

 

       接下来咱们看具体如何接收消息。code

private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = 10;//一个链接的最大通道数和所开的线程数一致

       首先初始化一个rabbit实例,而后经过RabbitConnection方法建立RabbitMQ链接。htm

       当链接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,否则会有问题(具体问题是,设了RabbitMQ链接超时时间为10秒,有时候无论用,缘由未查明。RabbitMQ建立链接默认超时时间为30秒,假如在这个时间内再去调用建立的话,就有可能获得两倍的channel;)

/// <summary>
        /// 单个RabbitMQ链接开多个线程,每一个线程开一个channel接受消息
        /// </summary>
        private void CreateConnecttion()
        {
            try
            {
                rabbit.RabbitConnection("localhost", nChannel);
                if (rabbit.conn != null)
                {
                    ThreadPool.SetMinThreads(1, 1);
                    ThreadPool.SetMaxThreads(100, 100);
                    for (int i = 1; i <= nChannel; i++)
                    {
                        ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");
                    }
                    myEvent.WaitOne();//等待全部线程工做完成后,才能关闭链接
                    rabbit.Close();
                }
            }
            catch (Exception ex)
            {
                rabbit.Close();
                Console.WriteLine(ex.Message);
            }
        }

 

       接着就是接收消息的方法,处理消息的过程省略了。

  /// <summary>
        /// 接收并处理消息,在一个链接中建立多个通道(channel),避免建立多个链接
        /// </summary>
        /// <param name="con">RabbitMQ链接</param>
        private void ReceiveMsg(object obj)
        {
            IModel channel = null;
            try
            {
                #region 建立通道,定义中转站和队列
                channel = rabbit.conn.CreateModel();
                channel.ExchangeDeclare(
                    exchange: "TestTopicChange", //exchange名称
                    type: ExchangeType.Topic, //Topic模式,采用路由匹配
                    durable: true,//exchange持久化
                    autoDelete: false,//是否自动删除,通常设成false
                    arguments: null//一些结构化参数,好比:alternate-exchange
                    );

                //定义阅卷队列
                channel.QueueDeclare(
                    queue: "Test_Queue", //队列名称
                    durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效)
                    exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除
                    autoDelete: false,
                    arguments: null
                    );
                #endregion
                channel.BasicQos(0, 1, false);//每次只接收一条消息

                channel.QueueBind(queue: "Test_Queue",
                                      exchange: "TestTopicChange",
                                      routingKey: "TestRouteKey.*");
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    //处理消息方法
                    try
                    {
                        bool isMark = AutoMark(message);
                        if (isMark)
                        {
                            //Function.writeMarkLog(message);
                            //确认该消息已被消费,发消息给RabbitMQ队列
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            if (MarkErrorSend(message))//把错误消息推到错误消息队列
                                channel.BasicReject(ea.DeliveryTag, false);
                            else
                                //消费消息失败,拒绝此消息,重回队列,让它能够继续发送到其余消费者 
                                channel.BasicReject(ea.DeliveryTag, true);
                        }
                    }
                    catch (Exception ex)
                    {
                        try
                        {
                            Console.WriteLine(ex.Message);
                            if (channel != null && channel.IsOpen)//处理RabbitMQ中止重启而自动评阅崩溃的问题
                            {
                                //消费消息失败,拒绝此消息,重回队列,让它能够继续发送到其余消费者 
                                channel.BasicReject(ea.DeliveryTag, true);
                            }
                        }
                        catch { }
                    }
                };
                //手动确认消息
                channel.BasicConsume(queue: "Test_Queue",
                                     autoAck: false,
                                     consumer: consumer);
            }
            catch (Exception ex)
            {
                try
                {
                    Console.WriteLine("接收消息方法出错:" + ex.Message);
                    if (channel != null && channel.IsOpen)//关闭通道
                        channel.Close();
                    if (rabbit.conn != null)//处理RabbitMQ忽然中止的问题
                        rabbit.Close();
                }
                catch { }
            }
        }

 

 

3、处理错误消息

       把处理失败的消息放到“错误队列”,而后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,若是把这些消息都放回原队列,它们会继续分发到其余线程的channel,但结果仍是处理不了,就会形成一个死循环,致使后面的消息没法处理)。把第一次处理不了的消息放到“错误队列”后,从新再开一个新的链接去处理“错误队列”的消息。

/// <summary>
        /// 把处理错误的消息发送到“错误消息队列”
        /// </summary>
        /// <param name="msg"></param>
        /// <returns></returns>
        private bool MarkErrorSend(string msg)
        {
            RabbitMQHelper MQ = new RabbitMQHelper();
            MQ.RabbitConnection("localhost",1);
            //建立通道
            var channel = MQ.conn.CreateModel();
            try
            {
                //定义一个Direct类型交换机
                channel.ExchangeDeclare(
                    exchange: "ErrorTopicChange", //exchange名称
                    type: ExchangeType.Topic, //Topic模式,采用路由匹配
                    durable: true,//exchange持久化
                    autoDelete: false,//是否自动删除,通常设成false
                    arguments: null//一些结构化参数,好比:alternate-exchange
                    );

                //定义阅卷队列
                channel.QueueDeclare(
                    queue: "Error_Queue", //队列名称
                    durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效)
                    exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除
                    autoDelete: false,//是否自动删除,通常设成false
                    arguments: null
                    );

                //将队列绑定到交换机
                string routeKey = "ErrorRouteKey.*";//*匹配一个单词
                channel.QueueBind(
                    queue: "Error_Queue",
                    exchange: "ErrorTopicChange",
                    routingKey: routeKey,
                    arguments: null
                    );

                //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一块儿使用才有效)
                IBasicProperties properties = channel.CreateBasicProperties();
                properties.DeliveryMode = 2;

                channel.ConfirmSelect();//发送确认机制
                byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
                //发布消息
                channel.BasicPublish(
                    exchange: "ErrorTopicChange",
                    routingKey: "ErrorRouteKey.one",
                    basicProperties: properties,
                    body: sendBytes
                    );

                bool isAllPublished = channel.WaitForConfirms();//通道(channel)里全部消息均发送才返回true
                return isAllPublished;
            }
            catch (Exception ex)
            {
                //写错误日志
                return false;
            }
            finally
            {
                channel.Close();
                MQ.conn.Close();
            }
        }

 

总结:RabbitMQ自己已经很稳定了,并且性能也很好,全部不稳定的因素都在咱们处理消息的过程,因此能够放心使用。

Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper

相关文章
相关标签/搜索