1、本文产生起因: html
以前文章《总结消息队列RabbitMQ的基本用法》已对RabbitMQ的安装、用法都作了详细说明,而本文主要是针对在高并发且单次从RabbitMQ中消费消息时,出现了链接数不足、链接响应较慢、RabbitMQ服务器崩溃等各类性能问题的解方案,之因此会出现我列举的这些问题,究基根源,实际上是TCP链接建立与断开太过频繁所致,这与咱们使用ADO.NET来访问常规的关系型DB(如:SQL SERVER、MYSQL)有所不一样,在访问DB时,咱们通常都建议你们使用using包裹,目的是每次建立完DB链接,使用完成后自动释放链接,避免没必要要的链接数及资源占用。可能有人会问,为什么访问DB,能够每次建立再断开链接,都没有问题,而一样访问MQ(本文所指的MQ均是RabbitMQ),每次建立再断开链接,若是在高并发且建立与断开频率高的时候,会出现性能问题呢?其实若是了解了DB的链接建立与断开以及MQ的链接建立与断开原理就知道其中的区别了。这里我简要说明一下,DB链接与MQ链接 其实底层都是基于TCP链接,建立TCP链接确定是有资源消耗的,是很是昂贵的,原则上尽量少的去建立与断开TCP链接,DB建立链接、MQ建立链接能够说是同样的,但在断开销毁链接上就有很大的不一样,DB建立链接再断开时,默认状况下是把该链接回收到链接池中,下次若是再有DB链接建立请求,则先判断DB链接池中是否有空闲的链接,如有则直接复用,若没有才建立链接,这样就达到了TCP链接的复用,而MQ建立链接都是新建立的TCP链接,断开时则直接断开TCP链接,简单粗暴,看似资源清理更完全,但若在高并发高频率每次都从新建立与断开MQ链接,则性能只会愈来愈差(上面说过TCP链接是很是昂贵的),我在公司项目中就出现了该问题,后面在技术总监的指导下,对MQ的链接建立与断开做了优化,实现了相似DB链接池的概念。java
链接池,故名思义,链接的池子,全部的链接做为一种资源集中存放在池中,须要使用时就能够到池中获取空闲链接资源,用完后再放回池中,以此达到链接资源的有效重用,同时也控制了资源的过分消耗与浪费(资源多少取决于池子的容量)缓存
2、源代码奉献(可直接复制应用到你们的项目中) 安全
下面就先贴出实现MQHelper(含链接池)的源代码:服务器
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Util; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Web.Caching; using System.Web; using System.Configuration; using System.IO; using System.Collections.Concurrent; using System.Threading; using System.Runtime.CompilerServices; namespace Zuowj.Core { public class MQHelper { private const string CacheKey_MQConnectionSetting = "MQConnectionSetting"; private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount"; private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空闲链接对象队列 private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)链接对象集合 private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//链接池使用率 private readonly static Semaphore MQConnectionPoolSemaphore; private readonly static object freeConnLock = new object(), addConnLock = new object(); private static int connCount = 0; public const int DefaultMaxConnectionCount = 30;//默认最大保持可用链接数 public const int DefaultMaxConnectionUsingCount = 10000;//默认最大链接可访问次数 private static int MaxConnectionCount { get { if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) { return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); } else { int mqMaxConnectionCount = 0; string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) { mqMaxConnectionCount = DefaultMaxConnectionCount; } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return mqMaxConnectionCount; } } } /// <summary> /// 创建链接 /// </summary> /// <param name="hostName">服务器地址</param> /// <param name="userName">登陆帐号</param> /// <param name="passWord">登陆密码</param> /// <returns></returns> private static ConnectionFactory CrateFactory() { var mqConnectionSetting = GetMQConnectionSetting(); var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConnectionSetting[0]; connectionfactory.UserName = mqConnectionSetting[1]; connectionfactory.Password = mqConnectionSetting[2]; if (mqConnectionSetting.Length > 3) //增长端口号 { connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]); } return connectionfactory; } private static string[] GetMQConnectionSetting() { string[] mqConnectionSetting = null; if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null) { //MQConnectionSetting=Host IP|;userid;|;password string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting]; if (!string.IsNullOrWhiteSpace(mqConnSettingStr)) { mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ链接字符串,若项目中无此需求可移除,EncryptUtility是一个AES的加解密工具类,你们网上可自行查找 if (mqConnSettingStr.Contains(";|;")) { mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries); } } if (mqConnectionSetting == null || mqConnectionSetting.Length < 3) { throw new Exception("MQConnectionSetting未配置或配置不正确"); } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath)); } else { mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[]; } return mqConnectionSetting; } public static IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//自动重连 var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } static MQHelper() { FreeConnectionQueue = new ConcurrentQueue<IConnection>(); BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>(); MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//链接池使用率 MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信号量,控制同时并发可用线程数 } public static IConnection CreateMQConnectionInPoolNew() { SelectMQConnectionLine: MQConnectionPoolSemaphore.WaitOne();//当<MaxConnectionCount时,会直接进入,不然会等待直到空闲链接出现 IConnection mqConnection = null; if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//若是已有链接数小于最大可用链接数,则直接建立新链接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//加入到忙链接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 1; // BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } } } if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //若是没有可用空闲链接,则从新进入等待排队 { // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); goto SelectMQConnectionLine; } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //若是取到空闲链接,判断是否使用次数是否超过最大限制,超过则释放链接并从新建立 { mqConnection.Close(); mqConnection.Dispose(); // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); } BusyConnectionDic[mqConnection] = true;//加入到忙链接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1 // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } private static void ResetMQConnectionToFree(IConnection connection) { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出 { // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } else { // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//若是由于高并发出现极少几率的>MaxConnectionCount,则直接释放该链接 { connection.Close(); connection.Dispose(); } else { FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供链接服务 } MQConnectionPoolSemaphore.Release();//释放一个空闲链接信号 //Interlocked.Decrement(ref connCount); //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount); } } /// <summary> /// 发送消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <typeparam name="T">消息类型</typeparam> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="msg">消息</param> /// <returns></returns> public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true) { try { using (var channel = connection.CreateModel())//创建通信信道 { // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其余参数 channel.QueueDeclare(queueName, durable, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("", queueName, properties, body); } return string.Empty; } catch (Exception ex) { return ex.ToString(); } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 消费消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="dealMessage">消息处理函数</param> /// <param name="saveLog">保存日志方法,可选</param> public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 var consumer = new QueueingBasicConsumer(channel); //创建消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(queueName, false, consumer); while (true) //若是队列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 依次获取单个消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <param name="QueueName">队列名称</param> /// <param name="durable">持久化</param> /// <param name="dealMessage">处理消息委托</param> public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //创建消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 获取队列消息数 /// </summary> /// <param name="connection"></param> /// <param name="QueueName"></param> /// <returns></returns> public static int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //获取队列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } return msgCount; } } public enum ConsumeAction { ACCEPT, // 消费成功 RETRY, // 消费失败,能够放回队列从新消费 REJECT, // 消费失败,直接丢弃 } }
如今对上述代码的核心点做一个简要的说明:并发
先说一下静态构造函数:app
FreeConnectionQueue 用于存放空闲链接对象队列,为什么使用Queue,由于当我从中取出1个空闲链接后,空闲链接数就应该少1个,这个Queue很好知足这个需求,并且这个Queue是并发安全的Queue哦(ConcurrentQueue)函数
BusyConnectionDic 忙(使用中)链接对象集合,为什么这里使用字典对象呢,由于当我用完后,须要可以快速的找出使用中的链接对象,并能快速移出,同时从新放入到空闲队列FreeConnectionQueue ,达到链接复用高并发
MQConnectionPoolUsingDicNew 链接使用次数记录集合,这个只是辅助记录链接使用次数,以即可以计算一个链接的已使用次数,当达到最大使用次数时,则应断开从新建立工具
MQConnectionPoolSemaphore 这个是信号量,这是控制并发链接的重要手段,链接池的容量等同于这个信号量的最大可并行数,保证同时使用的链接数不超过链接池的容量,若超过则会等待;
具体步骤说明:
1.MaxConnectionCount:最大保持可用链接数(能够理解为链接池的容量),能够经过CONFIG配置,默认为30;
2.DefaultMaxConnectionUsingCount:默认最大链接可访问次数,我这里没有使用配置,而是直接使用常量固定为1000,你们如有须要能够改为从CONFIG配置,参考MaxConnectionCount的属性设置(采起了依赖缓存)
3.CreateMQConnectionInPoolNew:从链接池中建立MQ链接对象,这个是核心方法,是实现链接池的地方,代码中已注释了重要的步骤逻辑,这里说一下实现思路:
3.1 经过MQConnectionPoolSemaphore.WaitOne() 利用信号量的并行等待方法,若是当前并发超过信号量的最大并行度(也就是做为链接池的最大容量),则须要等待空闲链接池,防止链接数超过池的容量,若是并发没有超过池的容量,则能够进入获取链接的逻辑;
3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,若是空闲链接队列+忙链接集合的总数小于链接池的容量,则能够直接建立新的MQ链接,不然FreeConnectionQueue.TryDequeue(out mqConnection) 尝试从空闲链接队列中获取一个可用的空闲链接使用,若空闲链接都没有,则须要返回到方法首行,从新等待空闲链接;
3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 若是取到空闲链接,则先判断使用次数是否超过最大限制,超过则释放链接或空闲链接已断开链接也须要从新建立,不然该链接可用;
3.4BusyConnectionDic[mqConnection] = true;加入到忙链接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次数加1,确保每使用一次链接,链接次数能记录
4.ResetMQConnectionToFree:重置释放链接对象,这个是保证MQ链接用完后可以回收到空闲链接队列中(即:回到链接池中),而不是直接断开链接,这个方法很简单就不做做过多说明。
好了,都说明了如何实现含链接池的MQHelper,如今再来举几个例子来讲明如何用:
3、实际应用(简单易上手)
获取并消费一个消息:
public string GetMessage(string queueName) { string message = null; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) => { message = msg; return ConsumeAction.ACCEPT; }); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex); message = "ERROR:" + ex.Message; } //BaseUtil.Logger.InfoFormat("第{0}次请求,从消息队列(队列名称:{1})中获取消息值为:{2}", Interlocked.Increment(ref requestCount), queueName, message); return message; }
发送一个消息:
public string SendMessage(string queueName, string msg) { string result = null; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); result = MQHelper.SendMsg(connection, queueName, msg); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex); result = ex.Message; } return result; }
获取消息队列消息数:
public int GetMessageCount(string queueName) { int result = -1; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); result = MQHelper.GetMessageCount(connection, queueName); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex); result = -1; } return result; }
这里说一下:BaseUtil.Logger 是Log4Net的实例对象,另外上面没有针对持续订阅消费消息(ConsumeMsg)做说明,由于这个其实能够不用链接池也不会有问题,由于它是一个持久订阅并持久消费的过程,不会出现频繁建立链接对象的状况。
最后要说的是,虽然说代码贴出来,你们一看就以为很简单,好像没有什么技术含量,但若是没有完整的思路也仍是须要花费一些时间和精力的,代码中核心是如何简单高效的解决并发及链接复用的的问题,该MQHelper有通过压力测试并顺利在我司项目中使用,完美解决了以前的问题,因为这个方案是我在公司通宵实现的,可能有一些方面的不足,你们能够相互交流或完善后入到本身的项目中。
2019-7-3更新:优化解决当已缓存的链接不可用时,致使没法复用,链接池一直被无效的长链接占满问题,以及处理消息时增长失败自动重试功能,代码以下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Util; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Web.Caching; using System.Web; using System.Configuration; using System.IO; using System.Collections.Concurrent; using System.Threading; using System.Runtime.CompilerServices; using System.Net.Sockets; namespace KYLDMQService.Core { public class MQHelper { private const string CacheKey_MQConnectionSetting = "MQConnectionSetting"; private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount"; public const int DefaultMaxConnectionCount = 30;//默认最大保持可用链接数 public const int DefaultMaxConnectionUsingCount = 10000;//默认最大链接可访问次数 public const int DefaultReTryConnectionCount = 1;//默认重试链接次数 private static int MaxConnectionCount { get { if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) { return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); } else { int mqMaxConnectionCount = 0; string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) { mqMaxConnectionCount = DefaultMaxConnectionCount; } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return mqMaxConnectionCount; } } } /// <summary> /// 创建链接 /// </summary> /// <param name="hostName">服务器地址</param> /// <param name="userName">登陆帐号</param> /// <param name="passWord">登陆密码</param> /// <returns></returns> private static ConnectionFactory CrateFactory() { var mqConnectionSetting = GetMQConnectionSetting(); var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConnectionSetting[0]; connectionfactory.UserName = mqConnectionSetting[1]; connectionfactory.Password = mqConnectionSetting[2]; if (mqConnectionSetting.Length > 3) //增长端口号 { connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]); } return connectionfactory; } private static string[] GetMQConnectionSetting() { string[] mqConnectionSetting = null; if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null) { //MQConnectionSetting=Host IP|;userid;|;password string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting]; if (!string.IsNullOrWhiteSpace(mqConnSettingStr)) { mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr); if (mqConnSettingStr.Contains(";|;")) { mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries); } } if (mqConnectionSetting == null || mqConnectionSetting.Length < 3) { throw new Exception("MQConnectionSetting未配置或配置不正确"); } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath)); } else { mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[]; } return mqConnectionSetting; } public static IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//自动重连 var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空闲链接对象队列 private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)链接对象集合 private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//链接池使用率 private readonly static Semaphore MQConnectionPoolSemaphore; private readonly static object freeConnLock = new object(), addConnLock = new object(); private static int connCount = 0; static MQHelper() { FreeConnectionQueue = new ConcurrentQueue<IConnection>(); BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>(); MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//链接池使用率 MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信号量,控制同时并发可用线程数 } public static IConnection CreateMQConnectionInPoolNew() { MQConnectionPoolSemaphore.WaitOne(10000);//当<MaxConnectionCount时,会直接进入,不然会等待直到空闲链接出现 //Interlocked.Increment(ref connCount); //BaseUtil.Logger.DebugFormat("thread Concurrent count:{0}", connCount); //int totalCount = FreeConnectionQueue.Count + BusyConnectionDic.Count; //BaseUtil.Logger.DebugFormat("totalCount:{0}", totalCount); //if (totalCount > MaxConnectionCount) //{ // System.Diagnostics.Debug.WriteLine("ConnectionCount:" + totalCount); // BaseUtil.Logger.DebugFormat("more than totalCount:{0}",totalCount); //} IConnection mqConnection = null; try { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//若是已有链接数小于最大可用链接数,则直接建立新链接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//加入到忙链接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 1; // BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } } } if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //若是没有可用空闲链接,则从新进入等待排队 { // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); return CreateMQConnectionInPoolNew(); } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //若是取到空闲链接,判断是否使用次数是否超过最大限制,超过则释放链接并从新建立 { if (mqConnection.IsOpen) { mqConnection.Close(); } mqConnection.Dispose(); // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); } BusyConnectionDic[mqConnection] = true;//加入到忙链接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1 // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } catch //若是在建立链接发生错误,则判断当前是否已得到Connection,若是得到则释放链接,最终都会释放链接池计数 { if (mqConnection != null) { ResetMQConnectionToFree(mqConnection); } else { MQConnectionPoolSemaphore.Release(); } throw; } } private static void ResetMQConnectionToFree(IConnection connection) { try { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出 { // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } else//若极小几率移除失败,则再重试一次 { if (!BusyConnectionDic.TryRemove(connection, out result)) { BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic(2 times):{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//若是由于高并发出现极少几率的>MaxConnectionCount,则直接释放该链接 { connection.Close(); connection.Dispose(); } else if (connection.IsOpen)//若是是OPEN状态才加入空闲队列,不然直接丢弃 { FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供链接服务 } } } catch { throw; } finally { MQConnectionPoolSemaphore.Release();//释放一个空闲链接信号 } //Interlocked.Decrement(ref connCount); //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount); } /// <summary> /// 发送消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <typeparam name="T">消息类型</typeparam> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="msg">消息</param> /// <returns></returns> public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true) { bool reTry = false; int reTryCount = 0; string sendErrMsg = null; do { reTry = false; try { using (var channel = connection.CreateModel())//创建通信信道 { // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其余参数 channel.QueueDeclare(queueName, durable, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("", queueName, properties, body); } sendErrMsg = string.Empty; } catch (Exception ex) { if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//可重试1次 { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } sendErrMsg = ex.ToString(); } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return sendErrMsg; } /// <summary> /// 消费消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="dealMessage">消息处理函数</param> /// <param name="saveLog">保存日志方法,可选</param> public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 var consumer = new QueueingBasicConsumer(channel); //创建消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(queueName, false, consumer); while (true) //若是队列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { //MQConnectionPool[connection] = false;//改成空闲 ResetMQConnectionToFree(connection); } } /// <summary> /// 依次获取单个消息 /// </summary> /// <param name="connection">消息队列链接对象</param> /// <param name="QueueName">队列名称</param> /// <param name="durable">持久化</param> /// <param name="dealMessage">处理消息委托</param> public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage) { bool reTry = false; int reTryCount = 0; ConsumeAction consumeResult = ConsumeAction.RETRY; IModel channel = null; BasicDeliverEventArgs ea = null; do { reTry = false; try { channel = connection.CreateModel(); channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //创建消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(QueueName, false, consumer); ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } else { dealMessage(string.Empty); } } catch (Exception ex) { if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//可重试1次 { if (channel != null) channel.Dispose(); ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { if (channel != null && ea != null) { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } if (channel != null) channel.Dispose(); ResetMQConnectionToFree(connection); } } } while (reTry); } /// <summary> /// 获取队列消息数 /// </summary> /// <param name="connection"></param> /// <param name="QueueName"></param> /// <returns></returns> public static int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; bool reTry = false; int reTryCount = 0; do { reTry = false; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //获取队列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//可重试1次 { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return msgCount; } } public enum ConsumeAction { ACCEPT, // 消费成功 RETRY, // 消费失败,能够放回队列从新消费 REJECT, // 消费失败,直接丢弃 } }