【翻译】DotNetMQ: 一个.NET版完整的消息队列系统

在一个大型的分布式系统中,消息队列是不可缺乏的中间件,能很好的解决异步消息、应用解耦、均衡并发等问题。在.net中,偶然发现一个效率不错、安全可靠、功能齐全的消息组件,忍不住翻译过来,供你们快速预览。java

注:原做者用windows服务启动消息队列服务,可是本人在win10上测试出错,可自行改为控制台启动消息队列服务,而后用第三方工具注册服务(如:SrvanyUIweb

原文:http://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-NET算法

 

正文: 数据库


 一个新的、独立的、开源的,彻底基于C#和.NET Framework3.5的消息队列系统apache

下载源代码 - 1.28 MB编程

下载二进制文件 - 933 KBwindows

下载例子 - 534 KB数组

文章概要安全

  • 介绍
  • 什么是消息传递?
  • 什么是DotNetMQ?
  • 为何要一个新的消息代理?
    • 消息代理的必要性
    • 现有的消息代理
  • 安装、运行DotNetMQ
  • 第一个DotNetMQ程序
    • 注册应用程序到DotNetMQ
    • 开发Application1
    • 开发Application2
    • 消息属性:传送规则(Transmit Rule)
    • 客户端属性:通信方式(CommunicationWay)
    • 客户端属性:出错时从新链接服务器(ReConnectServerOnError)
    • 客户端属性:自动确认消息(AutoAcknowledgeMessages)
  • 配置DotNetMQ
    • 服务端
    • 应用程序
    • 路由/负载均衡
    • 其余设置
  • 网络传输消息
    • 一个简单的应用程序
    • 一个真实的案例:分布式短信处理器(Distributed SMS Processor)
  • 请求/应答式通讯
  • 面向服务架构的DotNetMQ
    • 简单应用程序:短息/邮件发送器
      • 服务端
      • 客户端
    • Web服务支持
  • DotNetMQ性能
  • 历史
  • 引用

介绍服务器

在这篇文章中,我将介绍一个新的、独立的、开源的,彻底基于C#和.NET Framework3.5的消息队列系统,DotNetMQ是一个消息代理,它包括确保传输,路由,负载均衡,服务器图等等多项功能。我将从解释消息的概念和消息代理的必要性讲起,而后,我会说明什么是DotNetMQ,以及如何使用它。

什么是消息传递

消息传递是一种异步通讯方式,具体就是在同一个或不一样的机器上运行的多个应用程序之间可靠的消息传递。应用程序经过发送一种叫消息的数据包和其余应用程序通讯。

一个消息能够是一个字符串,一个字节数组,一个对象等等。一般状况下,一个发送者(生产者)程序建立一个消息,并将其推送到一个消息队列,而后一个接受者(消费者)程序从队列中获取这个消息并处理它。发送程序和接受程序不须要同时运行,由于消息传递是一个异步过程。这就是所谓的松耦合通讯。

另外一方面,Web服务方法调用(远程方法调用)是一种紧耦合同步通讯(这两个应用程序在整个通讯的过程当中都必须是运行着而且可用,若是Web服务脱机或在方法调用期间发生错误,那么客户端应用程序将获得一个异常)。

图 - 1:两个应用程序间最简单的消息传递。

在上图中,两个应用程序经过消息队列进行松散耦合方式通讯。若是接受者处理消息的速度慢于发送者产生消息的速度,那么队列里的消息数就会增长。此外,在发送者发送消息的过程当中,接受者多是离线的。在这种状况下,当接收者上线后,它会从队列中获得消息(当它开始并加入这个队列时)。

消息队列一般由消息代理提供。消息代理是一个独立的应用程序(一个服务),其余应用程序经过链接它发送、接收消息。在消息被接收者接收以前,消息代理负责存储消息。消息代理能够经过路由多台机器把消息传送给目标应用程序,在消息被接收者正确处理以前,消息代理会一直尝试传送它。有时候消息代理也被称为面向消息的中间件(Message-Oriented-Middleware MOM)或者简单的叫消息队列(Message Queue MQ).

什么是DotNetMQ?

DotNetMQ是一个开源的消息代理,它有如下几个特色:

  • 持久和非持久的消息发送。
  • 即便在系统崩溃时,也会保证持久消息的传送。
  • 可在一个机器图里自动和手动设置消息的路由。
  • 支持多种数据库(MS SQL Server,MySQL,SQLite,和一些现有的基于内存的存储)
  • 支持不存储,直接发送及时消息。
  • 支持请求/应答式的消息。
  • 用客户端类库和DotNetMQ消息代理通讯很方便
  • 内置的框架,能够轻松地在消息队列上构建RMI服务。
  • 支持把消息传送给ASP.NET Web服务。
  • 基于图形界面的管理和监控工具。
  • 易于安装,管理和使用。
  • 彻底由C#开发(使用.NET Framework 3.5)。

在开始建立它的时候,我更喜欢叫它为MDS(消息传送系统 Message Delivery System)。由于它不只是一个消息队列,并且仍是一个直接传送消息到应用程序的系统和一个提供了创建应用服务框架的环境。我把它叫作DotNetMQ,是由于它彻底由.NET开发,并且这个名字也更好记。因此它原来的名字是MDS,以致于源码里有许多以MDS为前缀的类。

为何要一个新的消息代理?

消息代理的必要性

首先,我将演示一个须要消息代理的简单状况。

在个人业务经历中,我见到过一些很是糟糕且不寻常的异步企业应用集成解决方案。一般是运行在一台服务器上的一个程序执行一些任务,而且产生一些数据,而后将结果数据发送到另外一台服务器上的另外一个程序。第二个应用在数据上执行其余任务或计算结果(这台服务器在同一网络中或是经过互联网链接)。另外,消息数据必须是持久的。即便远程程序没有工做或网络不可用,消息必须第一时间发送过去。

让咱们来看看下面的设计图:

图 - 2:一个糟糕的集成应用程序解决方案。

Application -1 和Application -2是可执行程序(或是Windows服务),Sender Service是一个Windows服务。Application -1执行一些任务,产生数据,并调用Server-B服务器上的Remote Web Service方法来传输数据。这个web服务将数据插入到数据表。Application -2按期检查数据表来得到新的数据行并处理它们(而后从表中删除它们,或将其标记为已处理,避免处理重复数据)。

若是在调用Web服务时或Web服务处理数据时出错,数据不能丢失,而且稍后必须重发。可是,Application -1有其余任务要作,因此它不能一次又一次的尝试重发数据。它只是将数据插入到数据表。另外一个Windows服务(若是Application -1是一直运行的,也可使里的一个线程)按期检查这个表,并尝试将数据发送到Web服务,直到数据成功发送。

这个解决方案的确是可靠的(消息确保传送了),但它不是两个应用程序之间通讯的有效方式。该解决方案有一些很是关键的问题:

  • 须要很长的开发时间(去编码)。
  • 要定制全部的消息类型(或远程方法调用),对于一个新的Web服务方法调用,你必须改变全部的服务、应用程序和数据表。
  • 对每个类似的服务,必须开发基本上同样的软件和结构(或复制,而后修改)。
  • 编码后须要对服务、程序、数据库作太多的测试和维护。
  • 一些程序和服务在没有新消息的时候,仍是会按期检查数据库(若是数据库没有很好的索引和优化,这可能会严重消耗系统资源)。

如今用消息代理来作这全部的事情,用最有效的方式负责将消息传送给远程应用。同一应用程序集成用上DotNetMQ展现于下图。

图 - 3:使用DotNetMQ的简单消息传递。

DotNetMQ是一个独立的Windows服务,分别运行在Server-A和Server-B服务器上。所以,你只需编写代码和DotNetMQ通讯。使用DotNetMQ客户端类库,和DotNetMQ服务发送、接收信息是很是容易和快速的。Application -1准备消息,设置目标,并将消息传递给DotNetMQ代理。DotNetMQ代理将以最有效和最快的方式传递给Application -2。

现有的消息代理

很显然,在集成应用程序中消息代理是有必要的。我网上搜索,查找书籍,想找一个免费的(最好也是开源的)并且是.Net用起来很容易的消息代理。让咱们看看我找到了什么:

  • Apache ActiveMQ(http://activemq.apache.org):它是开源的,而且实现了JMS(Java Message Service,java消息服务在java世界里是一个标准的消息传输API)。它也有一个.NET客户端类库。我为了更加了解,读完了“ActiveMQ in Action”整本书,而且开发了一些简单的应用。即便我通读了这本书,我没有看到一个简单可靠的方式来构建一个共同合做和路有消息的ActiveMQ服务图。我也没有看到如何给一个消息设置目标服务器。它自动路由消息,但我不能有效的控制路由的路径。个人理解是,它一般和Apache Camel(http://camel.apache.org)一块儿使用来实现常见的应用集成模式。Apache Camel也是另外一个须要去了解的领域,更糟的是,它只使用Java。最后,我认为它不够简单易用,特别是配置,监控和管理。因而我放弃了对ActiveMQ的研究。
  • MSMQ(http://msdn.microsoft.com/en-us/library/ms711472(VS.85).aspx):这是来自微软的解决方案,是.NET应用程序最合适的框架。它很容易使用和学习,并且它有工具看检测队列和消息。它尤为适用于那些运行在同一台机器上,或能够直接链接到同一台机器的应用程序间的异步通讯。但我没法找到一个内置的解决方案,构建一个MSMQ服务器图来路由消息。由于路由是个人出发点,因此我只好淘汰掉这个消息代理。
  • RabbitMQ(http://www.rabbitmq.com):它是由Erlang(有爱立信开发的一种编程语言)开发的。你须要先安装Erlang。我花了不少时间来安装,配置,并写了一个示例程序。它有一个.NET客户端,但当我试图开发并运行一个简单的程序是,出现不少错误。很难安装,很难使不一样服务器上的两个RabbitMQ协同工做。过了几天,我就放弃了,由于我以为学习并开始开发程序不该该那么难。
  • OpenAMQ(http://www.openamq.org),ZeroMQ(http://www.zeromq.org):我整体研究了这两个消息代理,但我发现我不能轻易作我想用.NET想作的事。
  • 其余:我还发现了一些其余的项目,但它们缺失一些重要的功能如路由,持久消息传递,请求/应答消息...等。

如你所见,在上面的列表中没有哪个消息代理是彻底由.NET开发的。

从用户角度来看,我只是想经过“消息数据,目标服务器和应用程序名称”来定位个人代理。其余的我都不关心。他将会根据须要在网络上屡次路由一个消息,最后发送到目标服务器的目标程序上。个人消息传送系统必须为我提供这个便利。这是个人出发点。我根据这一点大概设计了消息代理的结构。下图显示了我想要的。

图 - 4:自动路由消息的消息代理服务器图。

Application -1 传递一个消息到本地服务器(Server-A)上的消息代理:

  • 目标服务器:Server-D
  • 目标应用程序:Application -2
  • 消息数据:应用程序特定的数据

Server-A没有直接和Server-D链接。所以,消息代理在服务器间转发消息(这个消息依次经过Server-A,Server-B,Server-C,Server-D),消息最后到达Server-D上的消息代理,而后传递给Application -2。注意在Server-E上也有一个Application-2在运行,可是它不会收到这个消息,由于消息的目标服务器是Server-D。

DotNetMQ提供了这种功能和便利。它在服务器图上找到最佳的(最短的)路径把消息从原服务器转发到目标服务器。

通过这种全面的介绍会,让咱们看看若是在实践中使用DotNetMQ。

安装、运行DotNetMQ

如今尚未实现自动安装,不过安装DotNetMQ是很是容易的。下载并解压文章开始提供的二进制文件。只需将全部的东西复制到C:\Progame Files\DotNetMQ\下,而后运行INSTALL_x86.bat(若是你用的是64位系统,那么将执行INSTALL_x64)。

你能够检查Windows服务,看看DotNetMQ是否已经安装并正常工做。

第一个DotNetMQ程序

让咱们看看实际中的DotNetMQ。为了使第一个程序足够简单,我假设是同一台机器上的两个控制台应用程序(实际上,就像咱们待会在文章中看到的那个,和在两台机器上的两个应用程序是没什么显著差别的,只是须要设置一下消息的目标服务器名字而已)。

  • Application1:从用户输入那里获得一个字符串消息,并将其发送到Application2.
  • Application2:在控制台上打印出传入的消息。

注册应用程序到DotNetMQ

咱们的应用程序为了使用DotNetMQ,要先注册一下,只需操做一次,是一个很是简单的过程。运行DotNetMQ管理器(DotNETMQ文件夹下的MDSManager.exe,如上所诉,默认是在C:\Programe Files\DotNetMQ\文件夹下),并在Applications菜单中打开Application类表。点击Add New Appliction按钮,输入应用程序名称。

如上所述,添加Application1和Application2到DotNetMQ。最后,你的应用程序列表应该像下面这样。

图 - 5:DotNetMQ管理工具的应用程序列表界面。

开发Application1

在Visual Studio中建立一个名称为Application1的控制台应用程序,并添加MDSCommonLib.dll引用,这个dll文件里提供了链接到DotNetMQ必需的一些类。而后在Program.cs文件中写上下面的代码:

using System;
using System.Text;
using MDS.Client;

namespace Application1
{
    class Program
    {
        static void Main(string[] args)
        {
            //Create MDSClient object to connect to DotNetMQ
            //Name of this application: Application1
            var mdsClient = new MDSClient("Application1");

            //Connect to DotNetMQ server
            mdsClient.Connect();

            Console.WriteLine("Write a text and press enter to send " + 
               "to Application2. Write 'exit' to stop application.");

            while (true)
            {
                //Get a message from user
                var messageText = Console.ReadLine();
                if (string.IsNullOrEmpty(messageText) || messageText == "exit")
                {
                    break;
                }

                //Create a DotNetMQ Message to send to Application2
                var message = mdsClient.CreateMessage();
                //Set destination application name
                message.DestinationApplicationName = "Application2";
                //Set message data
                message.MessageData = Encoding.UTF8.GetBytes(messageText);

                //Send message
                message.Send();
            }

            //Disconnect from DotNetMQ server
            mdsClient.Disconnect();
        }
    }
}

在建立MDSClient对象时,咱们把要链接的应用程序名称传给构造函数,用这个构造函数,咱们将用默认端口(10905)链接本地服务器(127.0.0.1)上的DotNetMQ。重载的构造函数能够用于链接其余服务器和端口。

MDSClient的CreateMessage方法返回一个IOutgoingMessage的对象。对象的MessageData属性是实际发送给目标应用程序的数据,它是一个字节数组。咱们使用UTF8编码把用户输入的文本转换成字节数组。对象的DestinationApplicationName和DestinationServerName属性是用于设置消息的目标地址。若是咱们没有指定目标服务器,默认就是本地服务器。最后,咱们发送这个消息对象。

开发Application2

在Visual Studio里建立一个新的控制台应用程序,命名为Application2,添加MDSCommonLib.dll并写下如下代码:

using System;
using System.Text;
using MDS.Client;

namespace Application2
{
    class Program
    {
        static void Main(string[] args)
        {
            //Create MDSClient object to connect to DotNetMQ
            //Name of this application: Application2
            var mdsClient = new MDSClient("Application2");

            //Register to MessageReceived event to get messages.
            mdsClient.MessageReceived += MDSClient_MessageReceived;

            //Connect to DotNetMQ server
            mdsClient.Connect();

            //Wait user to press enter to terminate application
            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            //Disconnect from DotNetMQ server
            mdsClient.Disconnect();
        }

        /// <summary>
        /// This method handles received messages from other applications via DotNetMQ.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e">Message parameters</param>
        static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e)
        {
            //Get message
            var messageText = Encoding.UTF8.GetString(e.Message.MessageData);

            //Process message
            Console.WriteLine();
            Console.WriteLine("Text message received : " + messageText);
            Console.WriteLine("Source application    : " + e.Message.SourceApplicationName);

            //Acknowledge that message is properly handled
            //and processed. So, it will be deleted from queue.
            e.Message.Acknowledge();
        }
    }
}

咱们用和Application1类似的方法建立一个MDSClient对象,不一样的就是链接应用程序的名称是Application2。为了接收消息,须要给MDSClient对象注册MessageReceived事件。而后咱们链接DotNetMQ,直到用户输入Enter才断开。

当一个消息发送给Application2是,MDSClient_MessageReceived方法就会被调用来处理消息。咱们从MessageReceivedEventArgs参数对象的Message属性能够获得发送过来的消息。这个消息的类型是IIncomingMessage。IIncomingMessage对象的MessageData属性实际包含了由Application1发送的消息数据。因为它是一个字节数组,咱们用UTF8编码把它转换成字符串。而后把文本消息打印到控制台上。

图 - 6:Application1经过DotNetMQ发送两个消息到Application2。

处理传入消息以后,还须要来确认这个消息。这表示消息已经正确接收并处理。而后DotNetMQ将从消息队列中把消息删除。咱们也能够用Reject方法拒绝一个消息(若是在出错的状况下咱们不能处理这个消息)。在这种状况下,该消息将回到消息队列,稍后再试着发到目标应用程序(若是在同一个服务器上存在另外一个Application2的实体,也可能发到另外一个上)。这是DotNetMQ系统的一个强大机制。所以,能够确保消息不会丢失并绝对能够被处理。若是你不确认或拒绝一个消息,系统假设是被拒绝的。因此,即便你的应用程序崩溃了,在你的应用程序正常运行后,仍是会收到消息的。

若是你在同一台服务器上运行多个Application2的实例,哪个会收到消息呢?在这种状况下,DotNetMQ会把消息顺序地发给这多个实例。因此你能够建立多发送/接收的系统。一个消息只能被一个实例接收(实例接收相互不一样的消息)。DotNetMQ提供这全部功能和同步。

消息属性:传送规则(Transmit Rule)

在发送一个消息以前,你能够像这样设置一个消息的Transmit Rule属性:

message.TransmitRule = MessageTransmitRules.NonPersistent;

传送规则有三种类型:

  • StoreAndForward:这个是默认传送规则,消息是持久的,不会丢失的,而且使确保传送的。若是Send方法没有抛出异常,就代表消息已被DotNetMQ接收,并且存储到了数据库。直到目标应用程序接收并确认了它,这个消息会一直存储在数据库里。
  • NonPersistent:消息不会存储到数据库,这是发送消息最快的方式。仅在DotNetMQ服务中止工做,消息才会丢失。
  • DirectlySend:这个是DotNetMQ独有的功能。这种类型的消息直接发送给目标应用程序。在接收者确认一个消息以前,发送者程序是一直被阻塞的。因此,若是发送者在调用Send方法的过程当中没有发生异常,就意味着该消息被接受者正确接收并确认。若是在传送消息时发生错误,或接受者处于脱机状态,或者接受者拒绝了消息,发送者在调用Send方法时都会获得一个异常。即便应用程序是在不一样的服务器上(更即便在应用程序之间有许多服务器要路由),这个规则依然能正常工做。

因为默认的传送规则是StoreAndForward,让咱们试试下面这些:

  • 运行Application1(这时Application2没有运行),输入一些消息,而后关闭程序。
  • 运行Application2,你将看到消息没有丢失,而是被Application2接收了。

即便在Application1发送过消息后,你中止了DotNetMQ服务,你的消息也是不会丢失的,这就叫持久化

客户端属性:通信方式(CommunicationWay)

默认状况下,一个应用程序能够经过MDSClient发送和接收消息(CommunicationWays.SendAndReceive)。若是一个应用程序不须要接收消息,能够设置MDSClient的CommunicationWay为CommunicationWays.Send。这个属性在链接DotNetMQ以前或在和DotNetMQ通讯中均可以改变。

客户端属性:出错时从新链接服务器(ReConnectServerOnError)

默认状况下,MDSClient因为某种缘由断开DotNetMQ时会自动重连。因此,即便你重启DotNetMQ服务,也不用重启你的应用程序。你能够把ReconnectServerOnError设置为false来禁用自动重连。

客户端属性:自动确认消息(AutoAcknowledgeMessages)

默认状况下,你必须在MessageReceived事件中显式的确认消息。不然,系统将认为消息是被拒绝了。若是你想改变这种行为,你须要把AutoAcknowledgeMessages属性设为true。在这种状况下,若是你的MessageReceived事件处理程序没有抛出异常,你也没有显式确认和拒绝一个消息,系统将自动确认该消息(若是抛出异常,该消息将被拒绝)。

配置DotNetMQ

有两种方式能够配置DotNetMQ:经过XML配置文件或用DotNetMQ管理工具(一个Windows Forms程序),这里我分别演示这两种方法,有些配置是及时生效的,而有些则须要重启DotNetMQ。

服务端

你能够只在一台服务器上运行DotNetMQ,在这种状况下,是不须要为服务器配置任何东西的。但若是你想在多台服务器上运行DotNetMQ并使它们相互通讯,你就须要定义服务器图了。

一个服务器图包含两个或更多个节点,每个节点都是一个具备IP地址和TCP端口(被DotNetMQ用的那个)的服务器。你能够用DotNetMQ管理器配置/设计一个服务器图。

图 - 8:DotNetMQ服务器图管理。

在上图中,你看到了一个包含5个节点的服务器图。红色节点表示当前服务器(当前服务器就是你用DotNetMQ管理器链接的那个)。直线表示两个节点(它们互为相邻节点)是可链接的(它们能够发送/接收消息)。服务器/节点图形中的名称是很重要的,它被用来向该服务器发送消息。

你能够双击图形中的一个服务器来编辑它的属性。为了链接两个服务器,你要按住Ctrl键,点击第一个再点击第二个(断开链接也是相同的操做)。你能够经过点击右键,选择Set as this server来设置管理器链接该服务器。你能够从图中删除一个服务器或经过右键菜单添加一个新的服务器。最后,你能够经过拖拽添加或移除服务器。

当你设计好服务器图以后,你必须点击Save & Update Graph按钮来保存这些修改。这些修改将保存在DotNetMQ安装目录的MDSSettings.xml文件里。你必须重启DotNetMQ才能应用这些修改。

对于上面的服务器图,对应的MDSSettings.xml设置以下:

<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
  <Settings>
    ...
  </Settings>
  <Servers>
    <Server Name="halil_pc" IpAddress="192.168.10.105" 
       Port="10099" Adjacents="emre_pc" />
    <Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099" 
       Adjacents="halil_pc,out_server,webserver1,webserver2" />
    <Server Name="out_server" IpAddress="85.19.100.185" 
       Port="10099" Adjacents="emre_pc" />
    <Server Name="webserver1" IpAddress="192.168.10.263" 
       Port="10099" Adjacents="emre_pc,webserver2" />
    <Server Name="webserver2" IpAddress="192.168.10.44" 
       Port="10099" Adjacents="emre_pc,webserver1" />
  </Servers>
  <Applications>
    ...
  </Applications>
  <Routes>
    ...
  </Routes>
</MDSConfiguration>

固然,这个配置是要根据你实际的网络进行的。你必须在图中全部服务器上安装DotNetMQ。此外,还必须在全部服务器上配置相同的服务器图(你能够很容易地从XML文件复制服务器节点到其余服务器上)。

DotNetMQ采用段路径算法发送消息(没有在XML配置文件里手动定义路由的状况下)。考虑这个情景,运行在halil_pc的Application A发送一个消息到webserver2上的Application B,路径是很简单的:Application A -> halil_pc -> emre_pc -> webserver2 -> Application B。halil_pc经过服务器图定义知道下一个要转发到的服务器(emre_pc)。

最后,MDSSettings.design.xml包含了服务器图的设计信息(节点在屏幕上的位置)。这个文件只是用于DotNetMQ管理器的服务器图窗体,运行时的DotNetMQ服务是不须要的。

应用程序

就像图 - 5显示的那样,你能够把和DotNetMQ关联的应用程序做为消息代理来添加/删除。对于这些修改是不须要重启DotNetMQ的。应用程序的配置也保存在MDSSettings.xml文件里,就像下面这样:

<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
  ...
  <Applications>
    <Application Name="Application1" />
    <Application Name="Application2" />
  </Applications>
  ...
</MDSConfiguration>

一个应用程序必须在这个列表里才能和DotNetMQ链接。若是你直接修改xml文件,你必须重启DotNetMQ服务才能生效。

路由/负载均衡

DotNetMQ的有一个路由功能。如今路由设置只能经过MDSSettings.xml设置。你能够看到下面文件里有两种路由设置:

<?xml version="1.0" encoding="utf-8" ?>
<MDSConfiguration>
  ...
  <Routes>

    <Route Name="Route-App2" DistributionType="Sequential" >
      <Filters>
        <Filter DestinationServer="this" DestinationApplication="Application1" />
      </Filters>
      <Destinations>
        <Destination Server="Server-A" Application="Application1" RouteFactor="1" />
        <Destination Server="Server-B" Application="Application1" RouteFactor="1" />
        <Destination Server="Server-C" Application="Application1" RouteFactor="1" />
    </Destinations>
    </Route>

    <Route Name="Route-App2" DistributionType="Random" >
      <Filters>
        <Filter DestinationServer="this" DestinationApplication="Application2" /> 
        <Filter SourceApplication="Application2" TransmitRule="StoreAndForward" /> 
    </Filters>
      <Destinations>
        <Destination Server="Server-A" Application="Application2" RouteFactor="1" />
        <Destination Server="Server-B" Application="Application2" RouteFactor="3" />
      </Destinations>
    </Route>
    
  </Routes>
  ...
</MDSConfiguration>

每一个路由节点有两个属性:Name属性是对用户友好的显示(不影响路由功能),DistributionType是路由的策略。这里有两种类型的路由策略:

  • Sequential:消息依次顺序的路由到目标服务器。Destination的RouteFactor是分发因子。
  • Random:消息随机的路由到目标服务器。选择Server-A服务器的几率是:(Server-A的RouteFactor)/(Destinations里全部RouteFactor的总和)。

Filters用于决定消息使用哪一个路由。若是一个消息的属性和其中一个过滤器匹配,该消息就会被路由。这有5个条件(XML的5个属性)来定义一个过滤器:

  • SourceServer:消息的第一个源服务器,能够用this表示当前服务器。
  • SourceApplication:发现消息的应用程序。
  • DestinationServer:消息的最终目标服务器,能够用this表示当前服务器。
  • DestinationApplication:接收消息的应用程序。
  • TransmitRule:消息传送规则的一种(StoreAndForward,DirectlySend,NonPersistent)。

过滤消息时,不会考虑没有定义的条件。因此,若是全部的条件都是空的(或直接没定义),那么全部的消息都适合这个过滤器。只有全部的条件都匹配时,一个过滤器才适合这个消息。若是一个消息正确匹配(至少是过滤器定义的都匹配)一个路由中的一个过滤器,那么这个路由将被选择并使用。

Destinations是用来将消息路由到其余服务器用的。一个目标服务器被选中是根据Route节点的DistributionType属性(前面解释过)决定的。一个destination节点必须定义三个属性

  • Server:目标服务器,能够用this表示当前服务器。
  • Application:目标应用程序,目标应用程序一般和消息的原目标程序是同样的,不过这里你能够重定向到另外一个应用程序。
  • RouteFactor:这个属性用于代表一个目标服务器被选中的相对比率,能够用来作负载均衡。若是你想把消息平均分发到全部服务器上,你能够把全部目标服务器的FouteFactor属性都设为1。可是若是你有两台服务器,其中一台比另外一台性能强大的多,你能够经过设置这个路由因子来达到选择第一台服务器的几率是第二台的两倍以上。

修改路由配置,必须重启DotNetMQ才会生效。

其余设置

目前DotNetMQ支持3中存储类型:SQLite(默认),MySQL内存(译者注:根据下面内容,还支持MSSQL)。你能够在MDSSettings.xml修改存储类型。

  • SQLite:使用SQLite数据库系统。这个是默认存储类型,使用(DotNetMQ安装目录\SqliteDB\MDS.s3db)文件做为数据库。
  • MSSQL:使用微软SQL Server数据库,你须要提供ConnectionString属性做为链接字符串(下面会说到)。
  • MySQL-ODBC:经过ODBC使用MySQL数据库,你须要提供ConnectionString数据做为链接字符串。
  • MySQL-Net:经过.NET Adapter(.NET适配器)使用MySQL数据库,你须要提供ConnectionString数据做为链接字符串。
  • Memory:使用内存做为存储设备。在这种状况下,若是DotNetMQ中止了,持久性消息会丢失。

下面是一个使用MySQL-ODBC做为存储的简单配置:

<Settings>
    <Setting Key="ThisServerName" Value="halil_pc" />
    <Setting Key="StorageType" Value="MySQL-ODBC" />
    <Setting Key="ConnectionString" 
       Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" />
  </Settings>

你能够在Setup\Databases文件夹(这个文件夹在DotNetMQ的安装目录)找到所需的文件,而后建立数据库和数据表,以供DotNetMQ使用。若是你有什么问题,能够随时问我。

还有一个设置是定义"current/this"这个名称表明哪台服务器的,这个值必须是Servers节点里的一个服务器名。若是你用DotNetMQ管理器编辑服务器图,这个值是自动设置的。

网络传输消息

向一个网络服务器的应用程序发消息是和向同一个服务器的应用程序发消息同样简单的。

一个简单的应用程序

让咱们考虑下面这个网络:

图 - 8:两个应用程序经过DotNetMQ在网络上通讯。

运行在ServerA上的Application1想发消息到ServerC上的Application2,因为防火墙的规则,ServerA和ServerC不能直接链接。让咱们修改一下在第一个DotNetMQ程序里开发的程序。

Application2甚至一点有不用修改,只要把Application2上ServerC上运行并等待传入的消息便可。

Application1只是在如何发消息的地方稍微改动一点,就是设置DestinationServerName(目标服务器名)为ServerC。

var message = mdsClient.CreateMessage();
message.DestinationServerName = "ServerC"; //Set destination server name here!
message.DestinationApplicationName = "Application2";
message.MessageData = Encoding.UTF8.GetBytes(messageText);
message.Send();

就这样,就完事儿了。你不须要知道ServerC在哪里,也不须要直接链接ServerC...这些所有定义在DotNetMQ设置里。注意:若是你不给一个消息设置DestinationServerName,系统假设目标服务器就是"current/this"指定的那台服务器,DotNetMQ也将把消息发送到同一台服务器上的应用程序。另外,若是你定义了必要的路由,你就没必要设置目标服务器了,DotNetMQ会自动地路由消息。

固然,DotNetMQ的设置必须根据服务器间的链接(服务器图)来设置,而且Application1和Application2必须像配置DotNetMQ部分说的那样注册到DotNetMQ服务器。

一个真实的案例:分布式短信处理器(Distributed SMS Processor)

正如你已看到的那样,DotNetMQ能够用于构建分布式负载均衡应用系统。在本节中,我将讨论一个生活中真实的场景:一个分布式消息处理系统。

假定有一个用于音乐比赛投票的短消息(MSM)服务。全部竞赛者唱过他们的歌曲后,观众给他们最喜欢的歌手投票,会发一条像"VOTE 103"这样的短信到咱们的短息服务器。并假定此次投票会在短短的30分钟完成,大约有五百万人发短息到咱们的服务。

咱们将会接收每一条短息,处理它(格式化短息文本,修改数据库,以便增长选手的票数),并要发送确认消息给发送者。咱们从两台服务器接收消息,在四台服务器上处理消息,而后从两台服务器上发送确认消息。咱们总共有八台服务器。让咱们看看完整的系统示意图:

图 - 9:分布式短信处理系统

这里有三种类型的应用:接受者,处理器,和发送者。在这种状况下,你就可使用DotNetMQ做为消息队列和负载均衡器,经过配置服务器图和路由(就像配置DotNetMQ小节中描述的那样),来构建一个分布式的,可扩展的消息处理系统。

请求/应答式通讯

在许多状况下,一个应用发一个消息到另外一个应用,而后获得一个应答消息。DotNetMQ对这种通讯方式有内置的支持。考虑这样一个服务:用于查询库存的状态。这里有两种消息类型:

[Serializable]
public class StockQueryMessage
{
    public string StockCode { get; set; }
}

[Serializable]
public class StockQueryResultMessage
{
    public string StockCode { get; set; }
    public int ReservedStockCount { get; set; }
    public int TotalStockCount { get; set; }
}

下面展现了一个简单的库存服务。

using System;
using MDS;
using MDS.Client;
using StockCommonLib;

namespace StockServer
{
    class Program
    {
        static void Main(string[] args)
        {
            var mdsClient = new MDSClient("StockServer");
            mdsClient.MessageReceived += MDSClient_MessageReceived;

            mdsClient.Connect();

            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            mdsClient.Disconnect();
        }

        static void MDSClient_MessageReceived(object sender, 
                    MessageReceivedEventArgs e)
        {
            //Get message
            var stockQueryMessage = 
                GeneralHelper.DeserializeObject(e.Message.MessageData) 
                as StockQueryMessage;
            if (stockQueryMessage == null)
            {
                return;
            }

            //Write message content
            Console.WriteLine("Stock Query Message for: " + 
                              stockQueryMessage.StockCode);

            //Get stock counts from a database...
            int reservedStockCount;
            int totalStockCount;
            switch (stockQueryMessage.StockCode)
            {
                case "S01":
                    reservedStockCount = 14;
                    totalStockCount = 80;
                    break;
                case "S02":
                    reservedStockCount = 0;
                    totalStockCount = 25;
                    break;
                default: //Stock does not exists!
                    reservedStockCount = -1;
                    totalStockCount = -1;
                    break;
            }

            //Create a reply message for stock query
            var stockQueryResult = new StockQueryResultMessage
                                       {
                                           StockCode = stockQueryMessage.StockCode,
                                           ReservedStockCount = reservedStockCount,
                                           TotalStockCount = totalStockCount
                                       };
            
            //Create a MDS response message to send to client
            var responseMessage = e.Message.CreateResponseMessage();
            responseMessage.MessageData = 
               GeneralHelper.SerializeObject(stockQueryResult);

            //Send message
            responseMessage.Send();

            //Acknowledge the original request message.
            //So, it will be deleted from queue.
            e.Message.Acknowledge();
        }
    }
}

这个库存服务监听进来的StockQueryMessage消息对象,而后把StockQueryResultMessage消息对象发送给查询者。为了简单起见,我没有从数据库查询库存。应答消息对象是由传入消息对象的CreateResponseMessage()方法建立的。最后,发出回应消息后要确认进入的消息。如今,我展现一个简单的库存客户端从服务器查询库存的示例:

using System;
using MDS;
using MDS.Client;
using MDS.Communication.Messages;
using StockCommonLib;

namespace StockApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to query a stock status");
            Console.ReadLine();

            //Connect to DotNetMQ  
            var mdsClient = new MDSClient("StockClient");
            mdsClient.MessageReceived += mdsClient_MessageReceived;
            mdsClient.Connect();
            //Create a stock request message 
            var stockQueryMessage = new StockQueryMessage { StockCode = "S01" }; 
            //Create a MDS message 
            var requestMessage = mdsClient.CreateMessage(); 
            requestMessage.DestinationApplicationName = "StockServer"; 
            requestMessage.TransmitRule = MessageTransmitRules.NonPersistent; 
            requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage); 
            //Send message and get response    var responseMessage = requestMessage.SendAndGetResponse(); 
            //Get stock query result message from response message 
            var stockResult = (StockQueryResultMessage) GeneralHelper.DeserializeObject(responseMessage.MessageData); 
            //Write stock query result 
            Console.WriteLine("StockCode = " + stockResult.StockCode); 
            Console.WriteLine("ReservedStockCount = " + stockResult.ReservedStockCount); 
            Console.WriteLine("TotalStockCount = " + stockResult.TotalStockCount); 
            //Acknowledge received message 
            responseMessage.Acknowledge(); 
            Console.ReadLine(); 
            //Disconnect from DotNetMQ server. 
            mdsClient.Disconnect(); 
       } 
       static void mdsClient_MessageReceived(object sender, MessageReceivedEventArgs e) { 
            //Simply acknowledge other received messages 
            e.Message.Acknowledge(); 
       } 
   } 
}


 

在上面的示例中,为了演示目的TransmitRule设置成了NonPersistent(非持久)。固然,你能够发送StoreAndForward(持久性)消息。这个是程序运行的截图:

图 - 10:请求/应答式的通讯应用。

面向服务架构的DotNetMQ

SOA(面向服务的架构)是以个流行多年的概念了。Web服务和WCF是两个主要的SOA解决方案。通常状况下,一个消息队列系统是不会预期支持SOA的。同时,消息通讯是异步的,松耦合的过程,而Web服务方法调用则一般是同步的,紧耦合的。即便(正如你在前面示例程序中看到的那样)消息通讯并不如调用一个远程方法同样简单,可是当你的消息数增长,你的应用变复杂以致于难以维护时就不同了。DotNetMQ支持持久性和非持久性的远程调用机制,全部你能够异步地调用一个远程方法,DotNetMQ会确保调用成功。

简单应用程序:短息/邮件发送器

在这里咱们将开发一个简单的服务,可用于发送短信和邮件。也许没有必要专门写一个服务来发送短信和邮件,这些功能均可以在应用自身实现,可是想象一下你有不少应用都要发邮件,在发送时若是邮件服务出问题了怎么办?在能够成功发送邮件以前,应用程序必须一直尝试。因此你必须在你的应用程序中创建一个邮件队列机制,用于一次又一次的尝试发送。在最坏的状况下,你的应用程序可能只运行很短的时间(如Web服务)或者必须在发送完邮件前关闭。可是在邮件服务器上线后,你还必须发送,不容许邮件丢失。

在这种状况下,你能够开发一个单独的邮件/短信服务,它将尝试发送直到成功。你能够经过DotNetMQ开发一个邮件服务,仅当邮件发送成功时确认请求,若是发送失败,只要不确认(或拒绝)消息就好了,它稍后会重试。

服务端

首先,咱们开发短信/邮件的服务部分。为了实现这个,咱们必须定义一个派生自MDSService的类型:

using System;
using MDS.Client.MDSServices;

namespace SmsMailServer
{
    [MDSService(Description = "This service is a " + 
              "sample mail/sms service.", Version = "1.0.0.0")]
    public class MyMailSmsService : MDSService
    {
        //All parameters and return values can be defined.
        [MDSServiceMethod(Description = "This method is used send an SMS.")]
        public void SendSms(
            [MDSServiceMethodParameter("Phone number to send SMS.")] string phone,
            [MDSServiceMethodParameter("SMS text to be sent.")] string smsText)
        {
            //Process SMS
            Console.WriteLine("Sending SMS to phone: " + phone);
            Console.WriteLine("Sms Text: " + smsText);

            //Acknowledge the message
            IncomingMessage.Acknowledge();
        }

        //You do not have to define any parameters
        [MDSServiceMethod]
        public void SendEmail(string emailAddress, string header, string body)
        {
            //Process email
            Console.WriteLine("Sending an email to " + emailAddress);
            Console.WriteLine("Header: " + header);
            Console.WriteLine("Body  : " + body);

            //Acknowledge the message
            IncomingMessage.Acknowledge();
        }

        // A simple method just to show return values.
        [MDSServiceMethod]
        [return: MDSServiceMethodParameter("True, if phone number is valid.")]
        public bool IsValidPhone([MDSServiceMethodParameter(
               "Phone number to send SMS.")] string phone)
        {
            //Acknowledge the message
            IncomingMessage.Acknowledge();
            
            //Return result
            return (phone.Length == 10);
        }
    }
}

如你所见,它只是一个带有特性(Attribute)的一个常规C#类。MDSServiceMDSServiceMethod两个特性是必须的,其余的特性是可选的(不过写上去是最好了,你将很快会看到什么会用这些特性)。你提供服务的方法必须有MDSServiceMehod特性,若是你不想公开一些方法,只要不加MDSServiceMethod特性就好了。

你还必须在你的服务方法中确认消息,不然,这个消息(引发这个服务方法调用的那个)就不会从消息队列中删除,而咱们的服务方法将会被再次调用。若是咱们不能处理这个消息(好比,若是邮件服务没有工做,咱们没办法发送时)咱们也能够拒绝它。若是咱们拒绝了这个消息,它稍后还会发送给咱们(很可靠)。你能够经过MDSService类的IncomingMessage属性获得原消息,另外,你也能够经过RemoteApplication属性获得远程应用程序的信息。

建立了正确的服务类后,咱们必须建立一个应用来运行它,下面是用一个简单的控制台程序运行咱们的MyMailSmsService服务:

using System;
using MDS.Client.MDSServices;

namespace SmsMailServer
{  
    class Program
    {
        static void Main(string[] args)
        {
            using (var service = new MDSServiceApplication("MyMailSmsService"))
            {
                service.AddService(new MyMailSmsService());
                service.Connect();

                Console.WriteLine("Press any key to stop service");
                Console.ReadLine();
            }
        }
    }
}

如你所见,只须要3行代码就能够建立并运行服务,因为MDSService是可销毁的,因此你能够uing语句,另外,你也可使用MDSServiceApplicationDisconnect方法手动关闭服务。你能够经过AddService方法在一个MDSServiceApplication中运行多个服务。

客户端

为了开发一个使用DotNetMQ服务的应用,你必须建立一个服务代理(就像Web服务和WCF那样)。为了建立代理,你能够用MDSServiceProxyGenerator工具。首先,编译你的服务项目,而后运行MDSServiceProxyGenerator.exe(在DotNetMQ安装目录).

图 - 11:为DotNetMQ服务生成代理类。

选择你的服务程序集(在这个简单的例子中是指SmsMailServer.exe)。你能够选择服务类或生成这个程序集里全部服务的代理。输入一个命名空间和一个目标文件夹,而后生成代理类。生成玩后,你就能够把它加到你的项目里了。

我就不展现这个代理类了,但你必须了解它(你能够看源码,它是一个很简单的类)。你方法/参数上的特性用来生成这个代理类的注释。

在咱们的项目里添加这个代理类后,咱们就能够想简单方法调用那样向服务发消息了。

using System;
using MDS.Client;
using MDS.Client.MDSServices;
using SampleService;

namespace SmsMailClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to test SendSms method");
            Console.ReadLine();

            //Application3 is name of an application that sends sms/email.
            using (var serviceConsumer = new MDSServiceConsumer("Application3"))
            {
                //Connect to DotNetMQ server
                serviceConsumer.Connect();

                //Create service proxy to call remote methods
                var service = new MyMailSmsServiceProxy(serviceConsumer, 
                    new MDSRemoteAppEndPoint("MyMailSmsService"));

                //Call SendSms method
                service.SendSms("3221234567", "Hello service!");
            }
        }
    }
}

你也能够调用服务的其余方法,会获得像常规方法那样的返回值。实际上,你的方法调用被转换成了可靠的消息,好比,即便你的远程应用程序(MyMailSmsService)在方法调用时没有运行,在服务启动后也会被调用,因此你的方法调用是必定会被调用的。

你能够经过改变服务代理的TransmitRule属性来改变消息传输的规则。若是服务方法返回void,那么他的默认传输规则是StoreAndForward。若是服务方法有个一返回值,那么方法调用将会不可靠(由于方法调用时同步的,要等待一个结果的),它的规则是DiretlySend。你能够选择任何类型做为方法的参数,若是参数类型是基元类型(string,int,byte...),就不须要附加的设置,可是若是你想用你自定义的类型做为方法参数,这个类型必须标记为Serializable,由于DotNetMQ会用二进制序列化参数。

注意:你在运行这个例子前必须在DotNetMQ里注册MyMailSmsService和Application3。

Web服务支持

固然,你能够在Web服务里链接DotNetMQ,由于把自己仍是一个.Net应用程序。可是,为何你要写一个ASP.NET Web方法为应用程序处理消息(并且能够在同一个上下文中回复消息)呢?Web服务更适合这样请求/应答式的方法调用。

DotNetMQ支持ASP.NET web服务并能够传递消息到web服务。这里有个web服务的模板样品(在下载文件中)来实现这一目标。它的定义以下:

using System;
using System.Web.Services;
using MDS.Client.WebServices;

[WebService(Namespace = "http://www.dotnetmq.com/mds")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class MDSAppService : WebService
{
    /// <summary>
    /// MDS server sends messages to this method.
    /// </summary>
    /// <param name="bytesOfMessage">Byte array form of message</param>
    /// <returns>Response message to incoming message</returns>
    [WebMethod(Description = "Receives incoming messages to this web service.")]
    public byte[] ReceiveMDSMessage(byte[] bytesOfMessage)
    {
        var message = WebServiceHelper.DeserializeMessage(bytesOfMessage);
        try
        {
            var response = ProcessMDSMessage(message);
            return WebServiceHelper.SerializeMessage(response);
        }
        catch (Exception ex)
        {
            var response = message.CreateResponseMessage();
            response.Result.Success = false;
            response.Result.ResultText = 
              "Error in ProcessMDSMessage method: " + ex.Message;
            return WebServiceHelper.SerializeMessage(response);
        }
    }

    /// <summary>
    /// Processes incoming messages to this web service.
    /// </summary>
    /// <param name="message">Message to process</param>
    /// <returns>Response Message</returns>
    private IWebServiceResponseMessage 
            ProcessMDSMessage(IWebServiceIncomingMessage message)
    {
        //Process message

        //Send response/result
        var response = message.CreateResponseMessage();
        response.Result.Success = true;
        return response;
    }
}

如上所述,你不须要改变ReceiveMDSMessage方法,并且必须在ProcessMDSMessage方法里处理消息。另外,你须要向下面这样在MDSSettings.xml里定义你的web服务地址,你也能够用DotNetMQ管理工具添加web服务。

 ... 
  <Applications>
    <Application Name="SampleWebServiceApp">
      <Communication Type="WebService" 
        Url="http://localhost/SampleWebApplication/SampleService.asmx" />
    </Application>
  </Applications>
  ... 

DotNetMQ的性能

这是一些经过DotNetMQ传送消息的测试结果:

消息传送:

  • 持久地 10,000个消息大约须要25秒(约每秒400个消息)。
  • 非持久地 10,000个消息大约须要3.5秒(约每秒2850个消息)。

方法调用(在DotNetMQ服务里)

  • 持久地 10,000个方法调用大约须要25秒(约每秒400个)。
  • 非持久地 10,000个方法调用大约须要8.7秒(约每秒1150个)。

测试平台:Intel Core 2 Duo 3,00 GHZ CPU.2 GB RAM PC。消息传送和方法调用是在同一台电脑上的两个应用程序之间进行的。

引用

书籍:Enterprise Integration Patterns: Designing,Building,and Deploying Messaging Solutions .做者 Gregor Hohpe,Bobby Woolf(艾迪生韦斯利出版,2003年)。

历史

  • 2011-05-23(DotNetMQ v0.9.1.0)
    • 添加对微软SQL Server数据库的支持。
    • 把MySQLConnectionString设置改为ConnectionString。
    • 修改源码。
    • 根据修改更新了文章。
  • 2011-05-16 (DotNetMQ v0.9.0.0)
    • 添加web服务模板的下载。
    • 对文章作了一些修改和添加。
  • 2011-05-09(DotNetMQ v0.9.0.0)
    • 第一次发布。
相关文章
相关标签/搜索