它是一种异步传输模式,能够在不一样的应用之间实现相互通讯,相互通讯的应用能够分布在同一台机器上,也能够分布于相连的网络空间中的任一位置。html
它的实现原理是:消息的发送者把本身想要发送的信息放入一个Message中,而后把它保存至一个系统公用空间的消息队列(Message Queue)中;本地或者是异服务器
地的消息接收程序再从该队列中取出发给它的消息进行处理。如图所示:网络
优势:支持离线通信;有消息优先级;有保障的消息传递和执行许多业务处理的可靠的防故障机制;息传递机制使得消息通讯的双方具备不一样的物理平台成为可能。多线程
缺点:很难知足实时交互需求。并发
一、数据采集:适合多设备多应用数据采集功能。app
二、辅助实时交互:在大并发系统中,某一个操做涉及到不少步骤,某些步骤是不须要及时处理的,将不须要及时处理的步骤提出来,用消息队列处理。异步
好比:在一个高并发购物网站,一个顾客下单,顾客的操做记录、顾客的余额记录、商品相关的统计记录是不须要及时处理的,这些能够放消息队列处理,ide
延时处理。函数
三、多线程流水线处理:能够应用于生产者和消费者模式里面。好比:批量文件解压+解析+入库处理。为了下降服务器压力,将各个步骤分布在不一样的高并发
服务器上面,每一个步骤的结果能够放入消息队列。
在使用MSMQ开发以前,须要安装消息队列。按顺序操做:控制面板-》程序-》打开或关闭Windows功能,勾选MSMQ服务器全部选项,如图所示:
在调试、操做队列的时候,插入和取出队列信息之后,须要查看操做是否成功,就得去存放队列的服务器中查看队列信息。操做顺序:计算机-》管理-》
服务和应用程序,在里面有相应队列存储的信息,如图所示:
文件做用:
MSMQ队列操做的属性设置:
/// <summary> /// MSMQ队列操做的属性设置 /// </summary> public class MessageQueueSettings { /// <summary> /// 专用队列的基本路径 /// </summary> private string basePath = "private$"; /// <summary> /// 队列的ID /// </summary> public Guid MessageQueueId { get; set; } /// <summary> /// 队列的标签 /// </summary> public string MessageQueueLabel { get; set; } = "由程序建立的默认消息队列"; /// <summary> /// 访问队列的路径,默认是本机,记作“.” /// </summary> public string RemotePath { get; set; } = "."; /// <summary> /// 队列名称:默认是 msmqdefault /// </summary> public string MessageQueueName { get; set; } = ConfigSource.DefaultMSMQName; /// <summary> /// 队列的路径 /// </summary> public string MessageQueuePath => $"{RemotePath}\\{basePath}\\{MessageQueueName}"; /// <summary> /// 消息队列类型 /// </summary> public QueueType MessageQueueType { get; set; } = QueueType.PrivateQueue; /// <summary> /// 获取或设置队列日志的大小 /// </summary> public long MaximumJournalSize { get; set; } = uint.MaxValue; /// <summary> /// 是否启用日志 /// </summary> public bool UseJournalQueue { get; set; } = true; /// <summary> /// 获取或设置队列的大小 /// </summary> public long MaximumQueueSize { get; set; } = uint.MaxValue; /// <summary> /// 是否启用队列事务 /// </summary> public bool IsUseTransaction { get; set; } = true; /// <summary> /// 队列的访问方式 /// </summary> public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive; /// <summary> /// 设置/获取是否通过身份验证 /// </summary> public bool Authenticate { get; set; } = false; /// <summary> /// 获取或设置基优先级,“消息队列”使用该基优先级在网络上传送公共队列的消息。 /// </summary> public short BasePriority { get; set; } = 0; }
队列类型enum:
/// <summary> /// 队列类型 /// </summary> public enum QueueType { /// <summary> /// 私有队列 /// </summary> PrivateQueue = 0, /// <summary> /// 公共队列 /// </summary> PublicQueue = 1, /// <summary> /// 管理队列 /// </summary> ManageQueue = 2, /// <summary> /// 响应队列 /// </summary> ResponseQueue = 3 }
逻辑代码里面不直接调用队列建立方法,直接用操做基类的构造方法建立。思路:初始化参数实体-》初始化待参数实体的操做基类实例-》实例里面构造方法
判断是否存在存在该队列,若是存在就返回该队列实例,若是不存在,就建立队列,返回实例。
基础操做类:
#region 构造函数 /// <summary> /// 默认构造函数:不对外开放 /// </summary> private MessageQueueBase() { } /// <summary> /// 构造函数 /// </summary> /// <param name="setting">消息队列的配置</param> public MessageQueueBase(MessageQueueSettings setting) { try { this.msetting = setting; // 校验是否存在队列 if (msetting.RemotePath == "." && !Exists()) this.mqueue = Create(); else this.mqueue = new MessageQueue(msetting.MessageQueuePath, msetting.AccessMode); if (msetting.RemotePath == ".") { // 设置是否通过身份验证:默认为否 this.mqueue.Authenticate = msetting.Authenticate; // 是否启用日志队列 this.mqueue.UseJournalQueue = msetting.UseJournalQueue; // 最大日志队列长度:最大值为uint.MaxValue this.mqueue.MaximumJournalSize = msetting.MaximumJournalSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize; // 最大队列长度:最大值为uint.MaxValue this.mqueue.MaximumQueueSize = msetting.MaximumQueueSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize; // 队列标签 this.mqueue.Label = msetting.MessageQueueLabel; } // 设置基优先级,默认是0 this.mqueue.BasePriority = msetting.BasePriority; // 获取消息队列的id msetting.MessageQueueId = mqueue.Id; } catch (Exception ex) { throw ex; } } ~MessageQueueBase() { this.mqueue?.Close(); GC.SuppressFinalize(this); } #endregion #region 操做方法 /// <summary> /// 验证队列是否存在 /// </summary> /// <returns>验证结果</returns> public bool Exists() { var successed = false; try { successed = MessageQueue.Exists(msetting.MessageQueuePath); } catch (Exception ex) { throw ex; } return successed; } /// <summary> /// 建立队列 /// </summary> /// <param name="type">队列类型(默认是专用队列)</param> /// <returns>队列对象</returns> public MessageQueue Create() { MessageQueue queue = default(MessageQueue); try { queue = MessageQueue.Create(msetting.MessageQueuePath, msetting.IsUseTransaction); } catch (Exception ex) { throw ex; } return queue; } /// <summary> /// 删除队列 /// </summary> public void Delete() { try { MessageQueue.Delete(msetting.MessageQueuePath); } catch (Exception ex) { throw ex; } } #endregion
业务调用类:
/// <summary> /// 建立消息队列 /// </summary> /// <param name="queueName">队列名称</param> /// <param name="transaction">是不是事务队列</param> /// <returns></returns> public bool CreateMessageQueue(string queueName, bool transaction) { bool result = false; MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName= queueName, IsUseTransaction= transaction }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); result=messageQueueBase.Exists(); return result; }
测试结果:
发送消息分为单条发送和列表发送,是否启用事务,以当前需求和队列属性决定。
基础操做类:
#region 发送操做 /// <summary> /// 发送数据到队列 /// </summary> /// <typeparam name="T">发送的数据类型</typeparam> /// <param name="t">发送的数据对象</param> public void Send<T>(T t) { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; using (Message message = new Message()) { tran?.Begin(); message.Body = t; message.Label = $"推送时间[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]"; message.UseDeadLetterQueue = true; message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0); message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0); if (tran != null) { mqueue.Send(message, tran); } else { mqueue.Send(message); } tran?.Commit(); tran?.Dispose(); } } catch (Exception ex) { tran?.Abort(); tran?.Dispose(); throw ex; } } /// <summary> /// 发送多条数据 /// </summary> /// <typeparam name="T">发送的数据类型</typeparam> /// <param name="ts">发送的数据对象集合</param> public void SendList<T>(IList<T> ts) { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; tran?.Begin(); foreach (var item in ts) { using (Message message = new Message()) { message.Body = item; message.Label = $"推送时间[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]"; message.UseDeadLetterQueue = true; message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0); message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0); if (tran != null) { mqueue.Send(message, tran); } else { mqueue.Send(message); } } } tran?.Commit(); tran?.Dispose(); } catch (Exception ex) { tran?.Abort(); tran?.Dispose(); throw ex; } } #endregion
业务调用类:
/// <summary> /// 发送消息 /// </summary> /// <param name="queueName">队列名称</param> /// <param name="priority">优先级</param> /// <param name="isTransaction">是否启用队列事务</param> /// <param name="userInfo">新增用户信息</param> public void SendMessage(string queueName, short priority, bool isTransaction, user userInfo) { MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName, BasePriority = priority, IsUseTransaction = isTransaction }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); messageQueueBase.Send<user>(userInfo); }
测试结果:
消息接收分为同步接收和异步接收。
基础操做类:
#region 接收方法 /// <summary> /// 同步获取队列消息 /// </summary> /// <typeparam name="T">返回的数据类型</typeparam> /// <param name="action">获取数据后的回调</param> public void Receive<T>(Action<T> action) where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); Message message = mqueue.Receive(); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("队列获取的类型与泛型类型不符"); } catch (Exception ex) { throw ex; } } /// <summary> /// 同步查询但不移除队列第一条数据 /// </summary> /// <typeparam name="T">返回的数据类型</typeparam> /// <param name="action">获取数据后的回调</param> public void Peek<T>(Action<T> action) where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); Message message = mqueue.Peek(); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("队列获取的类型与泛型类型不符"); } catch (Exception ex) { throw ex; } } /// <summary> /// 获取全部消息 /// </summary> /// <typeparam name="T">数据类型</typeparam> /// <returns>取出来的数据</returns> public IList<T> GetAll<T>() where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); var enumerator = mqueue.GetMessageEnumerator2(); var ret = new List<T>(); while (enumerator.MoveNext()) { var messageInfo = enumerator.Current.Body as T; if (messageInfo != null) { ret.Add(messageInfo); } enumerator.RemoveCurrent(); enumerator.Reset(); } return ret; } catch (Exception ex) { throw ex; } } /// <summary> /// 异步接收队列消息,并删除接收的消息数据 /// </summary> /// <typeparam name="T">返回的数据类型</typeparam> /// <param name="action">获取数据后的回调</param> public void ReceiveAsync<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initReceiveAsync) { mqueue.ReceiveCompleted += (sender, e) => { Message message = mqueue.EndReceive(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("队列获取的类型与泛型类型不符"); }; initReceiveAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } catch (Exception ex) { throw ex; } } /// <summary> /// 启动一个没有超时设定的异步查看操做。直到队列中出现消息时,才完成此操做。 /// </summary> /// <typeparam name="T">获取的消息数据的类型</typeparam> /// <param name="action">获取数据后的回调</param> public void PeekAsync<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initPeekAsync) { mqueue.PeekCompleted += (sender, e) => { Message message = mqueue.EndPeek(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("队列获取的类型与泛型类型不符"); }; initPeekAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); tran?.Begin(); mqueue.BeginPeek(); tran?.Commit(); } catch (Exception ex) { throw ex; } } /// <summary> /// 异步循环获取队列消息,调用此方法将一直接收队列数据,直到IsStopReceiveLoop被设置为true /// </summary> /// <typeparam name="T">返回的数据类型</typeparam> /// <param name="action">获取数据后的回调</param> public void ReceiveAsyncLoop<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initReceiveAsync) { mqueue.ReceiveCompleted += (sender, e) => { Message message = mqueue.EndReceive(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("队列获取的类型与泛型类型不符"); //只要不中止就一直接收 if (!IsStopReceiveLoop) { tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } }; initReceiveAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); IsStopReceiveLoop = false; tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } catch (Exception ex) { throw ex; } } #endregion
业务调用类:
/// <summary> /// 接收消息 /// </summary> /// <param name="queueName">队列</param> /// <param name="isAsy">是否异步</param> public user ReceiveMessage(string queueName, bool isAsy) { user userInfo = new user(); MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); if (!isAsy) { messageQueueBase.Receive<user>(p => { userInfo = p; }); } else { messageQueueBase.ReceiveAsync<user>(p => { userInfo = p; }); } return userInfo; } /// <summary> /// 获取所有 /// </summary> /// <param name="queueName"></param> /// <returns></returns> public IList<user> GetAll(string queueName) { MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); return messageQueueBase.GetAll<user>(); }
测试结果:
点击插入数据,在队列中插入N条数据。同步接收和异步接收每次只获取队列第一条数据;异步循环是每次获取一条数据,循环获取,知道队列没有数据;
接收所有是一次性获取指定队列全部数据。
下一篇写Dapper应用或Redis应用。