消息队列(MSMQ)技术使得运行于不一样时间的应用程序可以在各类各样的网络和可能暂时脱机的系统之间进行通讯。html
应用程序将消息发送到队列,并从队列中读取消息。数据库
消息便是信息的载体。为了让消息发送者和消息接收者都可以明白消息所承载的信息(消息发送者须要知道如何构造消息;消息接收者须要知道如何解析消息)数组
下图演示了消息队列如何保存由多个发送应用程序生成的消息,并被多个接收应用程序读取。缓存
消息队列的安装能够服务器
一、安装消息队列
开始—》控制面板—》管理工具—》服务器管理器—》功能—》添加功能—》依次展开MSM、MSMQ服务—》肯定。
二、管理消息队列
计算机—》右键—》管理—》功能—》消息队列。网络
参考 点击异步
MSMQ能够被当作一个数据储存装置,就如同数据库,只不过数据存储的是一条一条的记录,而MSMQ存储的是一个一个的消息(messsge)。函数
Message能够被理解为一种数据容器,咱们在稍后会讲到。MSMQ一个重要的应用场景就是离线信息交互,例如,咱们在给朋友发送邮件,而此时朋友并未登入邮箱,工具
这个时候咱们的邮件就能够发到邮件服务器的MSMQ队列中,当朋友登入邮箱的时候,系统在从服务器的MSMQ队列中取出U件。固然MSMQ的用途远不止这些,学习
例如,充当数据缓存,实现异步操做等等,这里就不在一一举例了。
建立、删除和管理队列
要开发MSMQ程序就必须学习一个很重要的类(MessageQueue),该类位于名称空间System.Messageing下。
经常使用方法:
--Create() 方法:建立使用指定路径的新消息队列。
--Delete() 方法: 删除现有的消息队列。
--Existe() 方法: 查看指定消息队列是否存在。
--GetAllMessages() 方法 : 获得队列中的全部消息。
--GetPublicQueues() 方法: 在“消息队列”网络中定位消息队列。
--Peek()/BeginPeek() 方法: 查看某个特定队列中的消息队列,但不从该队列中移出消息。
--Receive()/BeginReceive() 方法:检索指定消息队列中最前面的消息并将其从该队列中移除。
--Send() 方法:发送消息到指定的消息队列。
--Purge() 方法:清空指定队列的消息。
经常使用属性:
Priority : 设置消息优先级,MessagePriority 枚举里所有进行了封装,MessagePriority.High();
AboveNormal:hight 与Normal 消息优先级之间;
High:高级消息优先级;
Highest:最高消息优先级;
Low:低消息优先级;
Lowest:最低消息优先级;
Normal:普通消息优先级;
VeryHigh:Highest 和High 消息优先级之间;
VeryLow:Low 和Lowest 消息优先级之间;
发送和序列化消息
MSMQ消息队列中定义的消息由一个主体(body)和若干属性构成。消息的主体能够由文本、二进制构成,根据须要还能够被加密。
在MSMQ中消息的大小不可以超过4MB。发送消息是经过Send方法来完成的,须要一个Message参数。
一、发送消息:
步骤:链接队列-->指定消息格式-->提供要发送的数据(主体)-->调用Send()方法将消息发送出去。详细见后面的示例程序。
二、序列化消息:
消息序列化能够经过.NET Framework附带的三个预约义格式化程序来完成:
-- XMLMessageFormatter对象----MessageQueue组件的默认格式化程序设置。
-- BinaryMessageFormatter对象;
-- ActiveXMessageFormatter对象;
因为后二者格式化后的消息一般不能为人阅读,因此咱们常常用到的是XMLMessageFormatter对象。该对象构造方法有三种重载:
一、public XmlMessageFormatter();
二、public XmlMessageFormatter(string[] targetTypeNames);
三、public XmlMessageFormatter(Type[] targetTypes);
如咱们后面的示例程序中用到的序列化语句:
//序列化为字符串
XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
读取和接收消息
一、读取消息:
也就是从指定队列中获取消息。
二、接收消息有两种方式:
--> 经过 Receive() 方法。获取移除消息。
--> 经过 Peek() 方法。 获取但不移除消息。
路径名和格式名能够用于标识队列,要查找队列,必须区分公共队列和私有队列。
公共队列在Active Directory中发布,对于这些队列,无需知道他们所在的系统,私有队列只有在已知队列所在的系统名时才能找到。
在Active Directory域中收索队列的标签,类别或格式名,就能够找到公共队列,还能够得到机器上的全部队列,
MessageQueue类的静态方法 GetPublicQueuesByLabel()、GetPublicQueuesByCategory() 和 GetPublicQueuesByMachine()
能够收索队列,GetPublicQueues() 方法返回包含域中全部公共队列的数组。
如:
static void Main(string[] args) { foreach(MessageQueue queue in MessageQueue.GetPublicQueues()) { Console.WriteLine(queue.Path); } }
消息队列分为如下几种,每种队列的路径表示形式以下:
这里的 MachineName 能够用 “."代替,表明当前计算机
须要先引用System.Messaging.dll
MSMQ 支持两种类型的队列,
一、事务性队列(transactional queue)会将消息持久(persiste)存储到磁盘中,即使服务器当机(shutdown)、重启(reboot)或崩溃(crash),
消息依然 能够在系统恢复后被读取。同时,消息发布、获取和删除都在环境事务范围内,从而确保消息的可靠性。
咱们还可使用 TransactionScope 将环境事务传递给队列,不然队列会自动建立一个内部事务。
二、非事务性队列(nontransactional volatile queues)只是将消息存在内存,不会使用磁盘进行持久存储,且不会使用事务来保护对消息的操做。
一但服务器发生问题,或者调用方出现异常,消息都会丢 失。
1 MessageQueue.Create(@"./private$/myqueue", true);
1 MessageQueue.Create(@"./private$/myqueue");
三、消息队列的优先级
在MSMQ中消息在队列里传输是分有优先级的,
优先级一共有七种,MessagePriority枚举里所有进行了封装。因这里只做程序演示就不一一列举出,仅用了Highest和Normal两种类型,
关于消息队列上进行消息传输的七种优先级你们能够参考我下面提供的 MessagePriority 枚举源代码定义。
那么在发送消息的时候怎么指定消息的优先级呢?在Message对象里封装有一个属性Priority,
接受一个枚举MessagePriority类型的值来设置消息传输的优先级。以下:
1 Message message = new Message(); 2 message.Priority = MessagePriority.Lowest; //定义消息的优先级
消息的优先级不能用于事务性消息发送。由于事务性消息是按照顺序发生和接收消息的。
四、消息队列超时设置和最大长度设置
发送:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 string msg = Console.ReadLine(); 6 //发生消息 7 new SendQueueMessage().SendMessageQueue(msg); 8 Console.ReadLine(); 9 } 10 } 11 12 /// <summary> 13 /// 消息队列发生消息 14 /// </summary> 15 public class SendQueueMessage 16 { 17 /// <summary> 18 /// 消息队列 19 /// </summary> 20 private MessageQueue messageQueue; 21 22 public SendQueueMessage() 23 { 24 //路径 25 string path = ".\\private$\\temp"; 26 if (MessageQueue.Exists(path)) 27 { 28 //若是存在指定路径的消息队列,则获取 29 messageQueue = new MessageQueue(path); 30 } 31 else 32 { 33 //不存在,则建立新的 34 messageQueue = MessageQueue.Create(path); 35 } 36 } 37 38 /// <summary> 39 /// 发生消息 40 /// </summary> 41 /// <param name="msg"></param> 42 public void SendMessageQueue(string msg) 43 { 44 Message message = new Message(); 45 message.Body = msg; 46 //指定格式 47 message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 48 messageQueue.Send(message); 49 } 50 }
接收:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //接收 6 new ReciveQueue().ReciveMessage(); 7 Console.ReadLine(); 8 } 9 } 10 11 /// <summary> 12 /// 消息队列接收 13 /// </summary> 14 public class ReciveQueue 15 { 16 /// <summary> 17 /// 消息队列 18 /// </summary> 19 private MessageQueue messageQueue; 20 21 public ReciveQueue() 22 { 23 //路径 24 string path = ".\\private$\\temp"; 25 if (MessageQueue.Exists(path)) 26 { 27 //若是存在指定路径的消息队列,则获取 28 messageQueue = new MessageQueue(path); 29 } 30 else 31 { 32 //不存在,则建立新的 33 messageQueue = MessageQueue.Create(path); 34 } 35 } 36 37 public void ReciveMessage() 38 { 39 while (true) 40 { 41 //Message.Receive()是同步进行的,若是队列中没有消息,会阻塞当前线程 42 Message message = messageQueue.Receive(); 43 message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 44 string msg = message.Body.ToString(); 45 Console.WriteLine(msg); 46 } 47 } 48 }
注意:messageQueue.Receive() 若是没有接收到消息,则会阻塞当前被调用线程,直到接收到消息为止。
这样就在数据转发服务器端建立了一个名为 temp 的消息队列;从客户端要发送的消息就保存在这个队列里,
你能够经过计算机管理->服务和应用下的消息队列中看到你建立的 temp 队列,private$关键字是说明队列为专用队列,
前面的“.”表明建立的队列目录是本机,这个队列一旦建立成功,就是系统的事了,接下来要作的就是你怎么去把消息写进这个队列,或者读取队列的值
这里要特别注意,不要将queuepath路径字符串写成
1 /// <summary> 2 /// 获取专用消息队列 3 /// </summary> 4 /// <returns></returns> 5 public static MessageQueue GetMessageQueue() 6 { 7 string publicMQPath = "FormatName:Direct=TCP:192.168.0.110\\private$\\DESKTOP-2AEJ0GU"; 8 //若是存在,获取消息队列 9 if (MessageQueue.Exists(publicMQPath)) 10 { 11 queue = new MessageQueue(publicMQPath); 12 } 13 else 14 { 15 //不存在,建立一个新的消息队列 16 queue = MessageQueue.Create(publicMQPath); 17 } 18 return queue; 19 }
这样写的话是用于远程计算机对这个队列进行访问的,
由于MessageQueue的Create() 和 Exisit() 方法是没办法去识别上述FormatName格式的,
还有要确保 Create() 函数要被执行了以后再用 MessageQueue 实例去引用;这样服务器端队列的建立就完成了;
事务我想你们对这个词应该都不会陌生,在操做数据库的时候常常都会用到事务,确保操做成功,要么所有完成(成功)
,要么所有不完成(失败)。在MSMQ中利用事务性处理,能够确保事务中的消息按照顺序传送,只传送一次,而且从目的队列成
功地被检索。
那么,在MSMQ上使用事务性处理怎么实现呢?能够经过建立 MessageQueueTransation 类的实例并将其关联到 MessageQueue
组件的实例来执行,执行事务的 Begin 方法,并将其实例传递到收发方法。而后,调用 Commit 以将事务的更改保存到目的队列。
建立事务性消息和普通的消息有一点小小的区别,你们可从下图上体会到:
发送:
1 /// <summary> 2 /// 事务性消息发送 3 /// </summary> 4 public class MQTranSendTest 5 { 6 /// <summary> 7 /// 执行发送 8 /// </summary> 9 public void Execute() 10 { 11 Console.WriteLine("请输入发送消息:"); 12 string flag = "N"; 13 while (flag != "Y") 14 { 15 string msg = Console.ReadLine(); 16 SendMessage(msg); 17 Console.WriteLine("按回车键继续,或者输入 Y 退出程序!"); 18 flag = Console.ReadLine(); 19 } 20 } 21 22 /// <summary> 23 /// 格式化发生内容,执行发送 24 /// </summary> 25 /// <param name="msg">发送的消息</param> 26 private static void SendMessage(string msg) 27 { 28 Message message = new Message(); 29 MessageQueue queue = GetMessageQueue(); 30 message.Body = msg; 31 //指定格式 32 queue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 33 MessageQueueTransaction queueTransaction = new MessageQueueTransaction(); 34 //启动事务 35 queueTransaction.Begin(); 36 queue.Send(message, queueTransaction); 37 //提交事务 38 queueTransaction.Commit(); 39 } 40 41 /// <summary> 42 /// 根据路径获取消息队列 43 /// </summary> 44 /// <returns></returns> 45 private static MessageQueue GetMessageQueue() 46 { 47 string path = ".\\private$\\TranTemp"; 48 if (MessageQueue.Exists(path)) 49 { 50 //若是存在指定路径的消息队列,则获取 51 return new MessageQueue(path); 52 } 53 else 54 { 55 //不存在,则建立新的 56 return MessageQueue.Create(path,true); 57 } 58 } 59 }
接收:
1 /// <summary> 2 /// 事务性接收消息 3 /// </summary> 4 public class MQTranReciveTest 5 { 6 /// <summary> 7 /// 消息队列 8 /// </summary> 9 private MessageQueue messageQueue; 10 11 public MQTranReciveTest() 12 { 13 //路径 14 string path = ".\\private$\\TranTemp"; 15 if (MessageQueue.Exists(path)) 16 { 17 //若是存在指定路径的消息队列,则获取 18 messageQueue = new MessageQueue(path); 19 } 20 else 21 { 22 //不存在,则建立新的 23 messageQueue = MessageQueue.Create(path); 24 } 25 } 26 27 public void ReciveMessage() 28 { 29 string msg = string.Empty; 30 while (true) 31 { 32 if (messageQueue.Transactional) 33 { 34 MessageQueueTransaction queueTransaction = new MessageQueueTransaction(); 35 //启动事务 36 queueTransaction.Begin(); 37 //Message.Receive()是同步进行的,若是队列中没有消息,会阻塞当前线程 38 Message message = messageQueue.Receive(queueTransaction); 39 message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 40 msg = message.Body.ToString(); 41 queueTransaction.Commit(); 42 Console.WriteLine(msg); 43 msg = string.Empty; 44 } 45 } 46 } 47 }
死信队列:因为某种缘由没法传递的消息都放置在死信队列上(如:超时,超出消息队列最大长度等)
超时、消息队列长度设置代码:
/// <summary> /// 格式化发生内容,执行发送 /// </summary> /// <param name="msg">发送的消息</param> public static void SendMessage(string msg) { Message message = new Message(msg); MessageQueue queue = GetMainQueue(); message.Body = msg; //指定格式 queue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); //设置日志队列最大个数 //queue.MaximumJournalSize = 1; //设置消息队列最大存放的个数 //queue.MaximumQueueSize = 1; //发送消息的时间开始算起,消息到达目标队列的时间 //message.TimeToReachQueue = new TimeSpan(0, 0, 1); //从发送消息到接收消息的总时间 //message.TimeToBeReceived = new TimeSpan(0, 0, 30); //设置为true,发送失败后进入死信队列 //message.UseDeadLetterQueue = true; queue.Send(message); }
如何访问远程的私有队列?
如何修改消息队列的最大存储限制?
如何修改消息队列的默认存储位置?
如何保证计算机重启以后队列中的消息还在?
文档:https://files.cnblogs.com/files/wwj1992/MSMQ_FAQ.rar
https://files.cnblogs.com/files/wwj1992/MSMQ_Doc.rar
参考:http://www.javashuo.com/article/p-pcmfcteg-bd.html
Demo 下载