最近在忙一个高考项目,看着系统顺利完成了此次高考,终于能够松口气了。看到那些即将参加高考的学生,也想起当年高三的本身。html
下面分享下RabbitMQ实战经验,但愿对你们有所帮助:git
关于RabbitMQ的基础使用,这里再也不介绍了,项目中使用的是Exchange中的topic模式。github
先上发消息的代码性能
private bool MarkErrorSend(string[] lstMsg) { try { var factory = new ConnectionFactory() { UserName = "guest",//用户名 Password = "guest",//密码 HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"], }; //建立链接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); try { //定义一个Direct类型交换机 channel.ExchangeDeclare( exchange: "TestTopicChange", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,通常设成false arguments: null//一些结构化参数,好比:alternate-exchange ); //定义测试队列 channel.QueueDeclare( queue: "Test_Queue", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效) exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除 autoDelete: false,//是否自动删除,通常设成false arguments: null ); //将队列绑定到交换机 string routeKey = "TestRouteKey.*";//*匹配一个单词 channel.QueueBind( queue: "Test_Queue", exchange: "TestTopicChange", routingKey: routeKey, arguments: null ); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一块儿使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//发送确认机制 foreach (var itemMsg in lstMsg) { byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg); //发布消息 channel.BasicPublish( exchange: "TestTopicChange", routingKey: "TestRouteKey.one", basicProperties: properties, body: sendBytes ); } bool isAllPublished = channel.WaitForConfirms();//通道(channel)里全部消息均发送才返回true return isAllPublished; } catch (Exception ex) { //写错误日志 return false; } finally { channel.Close(); connection.Close(); } } catch { //RabbitMQ.Client.Exceptions.BrokerUnreachableException: //When the configured hostname was not reachable. return false; } }
发消息没啥特别的。关于消息持久化的介绍这里也再也不介绍,不懂的能够看上篇文章。发消息须要注意的地方是,能够选择多条消息一块儿发送,最后才肯定消息发送成功,这样效率比较高;此外,须要尽可能精简每条消息的长度(楼主在这里吃过亏),否则会因消息过长从而增长发送时间。在实际项目中一次发了4万多条数据没有出现问题。测试
接下来讲下消费消息的过程,我使用的是单个链接多个channel,每一个channel每次只取一条消息方法。有人会问单个TCP链接,多个channel会不会影响通讯效率。这个理论上确定会有影响的,看影响大不大而已。我开的channel数通常去到30左右,并无以为影响效率,有多是由于我每一个channel是拿一条消息的缘由。经过单个链接多个channel的方法,能够少开了不少链接。至于我为何选每一个channel每次只取一条消息,这是外界因素限制了,具体看本身需求。spa
接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn链接变量,此外还有建立链接、关闭链接和验证链接是否打开等方法。程序运行一个定时器,当线程
检测到链接未打开的状况下,主动建立链接处理消息。日志
public class RabbitMQHelper { public IConnection conn = null; /// <summary> /// 建立RabbitMQ消息中间件链接 /// </summary> /// <returns>返回链接对象</returns> public IConnection RabbitConnection(string sHostName, ushort nChannelMax) { try { if (conn == null) { var factory = new ConnectionFactory() { UserName = "guest",//用户名 Password = "guest",//密码 HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"], AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测链接是否存在 RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒 RequestedChannelMax = nChannelMax//与开的线程数保持一致 }; //建立链接 conn = factory.CreateConnection(); Console.WriteLine("RabbitMQ链接已建立!"); } return conn; } catch { Console.WriteLine("建立链接失败,请检查RabbitMQ是否正常运行!"); return null; } } /// <summary> /// 关闭RabbitMQ链接 /// </summary> public void Close() { try { if (conn != null) { if (conn.IsOpen) conn.Close(); conn = null; Console.WriteLine("RabbitMQ链接已关闭!"); } } catch { } } /// <summary> /// 判断RabbitMQ链接是否打开 /// </summary> /// <returns></returns> public bool IsOpen() { try { if (conn != null) { if (conn.IsOpen) return true; } return false; } catch { return false; } } }
接下来咱们看具体如何接收消息。code
private static AutoResetEvent myEvent = new AutoResetEvent(false); private RabbitMQHelper rabbit = new RabbitMQHelper(); private ushort nChannel = 10;//一个链接的最大通道数和所开的线程数一致
首先初始化一个rabbit实例,而后经过RabbitConnection方法建立RabbitMQ链接。htm
当链接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,否则会有问题(具体问题是,设了RabbitMQ链接超时时间为10秒,有时候无论用,缘由未查明。RabbitMQ建立链接默认超时时间为30秒,假如在这个时间内再去调用建立的话,就有可能获得两倍的channel;)
/// <summary> /// 单个RabbitMQ链接开多个线程,每一个线程开一个channel接受消息 /// </summary> private void CreateConnecttion() { try { rabbit.RabbitConnection("localhost", nChannel); if (rabbit.conn != null) { ThreadPool.SetMinThreads(1, 1); ThreadPool.SetMaxThreads(100, 100); for (int i = 1; i <= nChannel; i++) { ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), ""); } myEvent.WaitOne();//等待全部线程工做完成后,才能关闭链接 rabbit.Close(); } } catch (Exception ex) { rabbit.Close(); Console.WriteLine(ex.Message); } }
接着就是接收消息的方法,处理消息的过程省略了。
/// <summary> /// 接收并处理消息,在一个链接中建立多个通道(channel),避免建立多个链接 /// </summary> /// <param name="con">RabbitMQ链接</param> private void ReceiveMsg(object obj) { IModel channel = null; try { #region 建立通道,定义中转站和队列 channel = rabbit.conn.CreateModel(); channel.ExchangeDeclare( exchange: "TestTopicChange", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,通常设成false arguments: null//一些结构化参数,好比:alternate-exchange ); //定义阅卷队列 channel.QueueDeclare( queue: "Test_Queue", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效) exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除 autoDelete: false, arguments: null ); #endregion channel.BasicQos(0, 1, false);//每次只接收一条消息 channel.QueueBind(queue: "Test_Queue", exchange: "TestTopicChange", routingKey: "TestRouteKey.*"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; //处理消息方法 try { bool isMark = AutoMark(message); if (isMark) { //Function.writeMarkLog(message); //确认该消息已被消费,发消息给RabbitMQ队列 channel.BasicAck(ea.DeliveryTag, false); } else { if (MarkErrorSend(message))//把错误消息推到错误消息队列 channel.BasicReject(ea.DeliveryTag, false); else //消费消息失败,拒绝此消息,重回队列,让它能够继续发送到其余消费者 channel.BasicReject(ea.DeliveryTag, true); } } catch (Exception ex) { try { Console.WriteLine(ex.Message); if (channel != null && channel.IsOpen)//处理RabbitMQ中止重启而自动评阅崩溃的问题 { //消费消息失败,拒绝此消息,重回队列,让它能够继续发送到其余消费者 channel.BasicReject(ea.DeliveryTag, true); } } catch { } } }; //手动确认消息 channel.BasicConsume(queue: "Test_Queue", autoAck: false, consumer: consumer); } catch (Exception ex) { try { Console.WriteLine("接收消息方法出错:" + ex.Message); if (channel != null && channel.IsOpen)//关闭通道 channel.Close(); if (rabbit.conn != null)//处理RabbitMQ忽然中止的问题 rabbit.Close(); } catch { } } }
把处理失败的消息放到“错误队列”,而后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,若是把这些消息都放回原队列,它们会继续分发到其余线程的channel,但结果仍是处理不了,就会形成一个死循环,致使后面的消息没法处理)。把第一次处理不了的消息放到“错误队列”后,从新再开一个新的链接去处理“错误队列”的消息。
/// <summary> /// 把处理错误的消息发送到“错误消息队列” /// </summary> /// <param name="msg"></param> /// <returns></returns> private bool MarkErrorSend(string msg) { RabbitMQHelper MQ = new RabbitMQHelper(); MQ.RabbitConnection("localhost",1); //建立通道 var channel = MQ.conn.CreateModel(); try { //定义一个Direct类型交换机 channel.ExchangeDeclare( exchange: "ErrorTopicChange", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,通常设成false arguments: null//一些结构化参数,好比:alternate-exchange ); //定义阅卷队列 channel.QueueDeclare( queue: "Error_Queue", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一块儿使用才有效) exclusive: false,//是否排他的,false。若是一个队列声明为排他队列,该队列首次声明它的链接可见,并在链接断开时自动删除 autoDelete: false,//是否自动删除,通常设成false arguments: null ); //将队列绑定到交换机 string routeKey = "ErrorRouteKey.*";//*匹配一个单词 channel.QueueBind( queue: "Error_Queue", exchange: "ErrorTopicChange", routingKey: routeKey, arguments: null ); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一块儿使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//发送确认机制 byte[] sendBytes = Encoding.UTF8.GetBytes(msg); //发布消息 channel.BasicPublish( exchange: "ErrorTopicChange", routingKey: "ErrorRouteKey.one", basicProperties: properties, body: sendBytes ); bool isAllPublished = channel.WaitForConfirms();//通道(channel)里全部消息均发送才返回true return isAllPublished; } catch (Exception ex) { //写错误日志 return false; } finally { channel.Close(); MQ.conn.Close(); } }
总结:RabbitMQ自己已经很稳定了,并且性能也很好,全部不稳定的因素都在咱们处理消息的过程,因此能够放心使用。
Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper