在使用MSMQ以前,咱们须要自行安装消息队列组件!(具体安装方法你们本身搜一下吧)html
采用MSMQ带来的好处是:因为是异步通讯,不管是发送方仍是接收方都不用等待对方返回成功消息,就能够执行余下的代码,于是大大地提升了事物处理的能力;当信息传送过程当中,信息发送机制具备必定功能的故障恢复能力;MSMQ的消息传递机制使得消息通讯的双方具备不一样的物理平台成为可能。数据库
参考了PetShop里MSMQ的代码,为了考虑到在扩展中会有其余的数据数据对象会使用到MSMQ,所以定义了一个DTcmsQueue的基类,实现消息Receive和Send的基本操做,使用到MSMQ的数据对象须要继承DTcmsQueue基类,须要注意的是:在MSMQ中使用事务的话,须要建立事务性的专用消息队列,代码以下:数组
using System; using System.Messaging; using log4net; namespace DTcms.Web.UI { /// <summary> /// 该类实现从消息对列中发送和接收消息的主要功能 /// </summary> public class DTcmsQueue : IDisposable { private static ILog logger = LogManager.GetLogger(typeof(DTcmsQueue)); //指定消息队列事务的类型,Automatic 枚举值容许发送外部事务和从处部事务接收 protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic; protected MessageQueue queue; protected TimeSpan timeout; //实现构造函数 public DTcmsQueue(string queuePath, int timeoutSeconds) { Createqueue(queuePath); queue = new MessageQueue(queuePath); timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds)); //设置当应用程序向消息对列发送消息时默认状况下使用的消息属性值 queue.DefaultPropertiesToSend.AttachSenderId = false; queue.DefaultPropertiesToSend.UseAuthentication = false; queue.DefaultPropertiesToSend.UseEncryption = false; queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None; queue.DefaultPropertiesToSend.UseJournalQueue = false; } /// <summary> /// 继承类将从自身的Receive方法中调用如下方法,该方法用于实现消息接收 /// </summary> public virtual object Receive() { try { using (Message message = queue.Receive(timeout, transactionType)) return message; } catch (MessageQueueException mqex) { if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) throw new TimeoutException(); throw; } } /// <summary> /// 继承类将从自身的Send方法中调用如下方法,该方法用于实现消息发送 /// </summary> public virtual void Send(object msg) { queue.Send(msg, transactionType); } /// <summary> /// 经过Create方法建立使用指定路径的新消息队列 /// </summary> /// <param name="queuePath"></param> public static void Createqueue(string queuePath) { try { if (!MessageQueue.Exists(queuePath)) { MessageQueue.Create(queuePath, true); //建立事务性的专用消息队列 logger.Debug("建立队列成功!"); } } catch (MessageQueueException e) { logger.Error(e.Message); } } #region 实现 IDisposable 接口成员 public void Dispose() { queue.Dispose(); } #endregion } }
上面咱们已经建立了DTcmsQueue基类,咱们具体实现的时候须要继承此基类,使用消息队列的时候,传递的是一个对象,因此咱们首先要建立这个对象,可是须要注意的一点:此对象是必须可序列化的,不然不能被插入到消息队列里,代码以下:多线程
/// <summary> /// 枚举,操做类型是增长仍是删除 /// </summary> public enum JobType { Add, Remove } /// <summary> /// 任务类,包括任务的Id ,操做的类型 /// </summary> [Serializable] public class IndexJob { public int Id { get; set; } public JobType JobType { get; set; } }
在具体的实现类里面,咱们只须要继承此基类,而后重写基类的方法,具体代码以下:异步
using System; using System.Configuration; using System.Messaging; namespace DTcms.Web.UI { /// <summary> /// 该类实现从消息队列中发送和接收订单消息 /// </summary> public class OrderJob : DTcmsQueue { // 获取配置文件中有关消息队列路径的参数 private static readonly string queuePath = ConfigurationManager.AppSettings["OrderQueuePath"]; private static int queueTimeout = 20; //实现构造函数 public OrderJob() : base(queuePath, queueTimeout) { // 设置消息的序列化采用二进制方式 queue.Formatter = new BinaryMessageFormatter(); } /// <summary> /// 调用PetShopQueue基类方法,实现从消息队列中接收订单消息 /// </summary> /// <returns>订单对象 OrderInfo</returns> public new IndexJob Receive() { // 指定消息队列事务的类型,Automatic枚举值容许发送发部事务和从外部事务接收 base.transactionType = MessageQueueTransactionType.Automatic; return (IndexJob)((Message)base.Receive()).Body; } //该方法实现从消息队列中接收订单消息 public IndexJob Receive(int timeout) { base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout)); return Receive(); } /// <summary> /// 调用PetShopQueue基类方法,实现从消息队列中发送订单消息 /// </summary> /// <param name="orderMessage">订单对象 OrderInfo</param> public void Send(IndexJob orderMessage) { // 指定消息队列事务的类型,Single枚举值用于单个内部事务的事务类型 base.transactionType = MessageQueueTransactionType.Single; base.Send(orderMessage); } } }
将任务添加到消息队列代码就很简单了,没啥好说的,直接上代码:分布式
#region 任务添加 public void AddArticle(int artId) { OrderJob orderJob = new OrderJob(); IndexJob job = new IndexJob(); job.Id = artId; job.JobType = JobType.Add; logger.Debug(artId + "加入任务列表"); orderJob.Send(job);//把任务加入消息队列 } public void RemoveArticle(int artId) { OrderJob orderJob = new OrderJob(); IndexJob job = new IndexJob(); job.JobType = JobType.Remove; job.Id = artId; logger.Debug(artId + "加入删除任务列表"); orderJob.Send(job);//把任务加入消息队列 } #endregion
接下来就是以下获得消息队列的任务,并将任务完成,由于消息队列是系统的一个组件跟咱们的项目是彻底分开的,咱们能够彻底独立的完成接收消息队列的任务并处理后来的动做,这样就作到了异步处理,例如作一个Windows Service,更重要的是MSMQ仍是一种分布式处理技术,在本项目中,咱们主要是开辟了多线程来接收消息队列的任务并处理后来的动做,具体代码以下:函数
public void CustomerStart() { log4net.Config.XmlConfigurator.Configure(); PanGu.Segment.Init(PanGuPath); //声明线程 Thread workTicketThread; Thread[] workerThreads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { //建立 Thread 实例 workTicketThread = new Thread(new ThreadStart(ProcessOrders)); // 设置线程在后台工做和线程启动前的单元状态(STA表示将建立并进入一个单线程单元 ) workTicketThread.IsBackground = true; workTicketThread.SetApartmentState(ApartmentState.STA); //启动线程,将调用ThreadStart委托 workTicketThread.Start(); workerThreads[i] = workTicketThread; } logger.Debug("进程已经开始启动. 按回车键中止."); } private static void ProcessOrders() { // 总事务处理时间(tsTimeout )就该超过批处理任务消息的总时间 TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize)); OrderJob orderJob = new OrderJob(); while (true) { // 消息队列花费时间 TimeSpan datetimeStarting = new TimeSpan(DateTime.Now.Ticks); double elapsedTime = 0; int processedItems = 0; ArrayList queueOrders = new ArrayList(); using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout)) { // 接收来自消息队列的任务消息 for (int j = 0; j < batchSize; j++) { try { //若是有足够的时间,那么接收任务,并将任务存储在数组中 if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds) { queueOrders.Add(orderJob.Receive(queueTimeout)); } else { j = batchSize; // 结束循环 } //更新已占用时间 elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datetimeStarting.TotalSeconds; } catch (TimeoutException) { //结束循环由于没有可等待的任务消息 j = batchSize; } } //从数组中循环取出任务对象,并将任务插入到数据库中 for (int k = 0; k < queueOrders.Count; k++) { SearchHelper sh = new SearchHelper(); sh.IndexOn((IndexJob)queueOrders[k]); processedItems++; totalOrdersProcessed++; } //指示范围中的全部操做都已成功完成 ts.Complete(); } //完成后显示处理信息 logger.Debug("(线程 Id " + Thread.CurrentThread.ManagedThreadId + ") 批处理完成, " + processedItems + " 任务, 处理花费时间: " + elapsedTime.ToString() + " 秒."); } }
以上就是我在实际项目中使用MSMQ的一些心得,但愿对各位看官有所帮助,MSMQ具体实现代码上面已经贴出来了,因为本项目是《动力起航》的源代码,一方面因为文件过大,另外一方面不知道是否是会侵权,全部没有提供下载.若是有须要的朋友能够留下邮箱我将发给你,但仅供学习交流之用,误用作商业用途,以上若是有侵权等问题还请及时告知我,以便我及时更正!post
另外此项目源代码已上传至搜索技术交流群:77570783,源代码已上传至群共享,须要的朋友,请自行下载!学习
若是以为好的话请给个推荐哦~~~~亲网站