做用就是提升系统的并发性,将一些不须要及时响应客户端且占用较多资源的操做,放入队列,再由另一个线程,去异步处理这些队列,可极大的提升系统的并发能力。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过写和检索出入列队的针对应用程序的数据(消息)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有MSMQ,ActiveMQ,RabbitMQ,IBM WEBSPHERE MQ 等等。
RabbitMQ是使用Erlang开发的,开源的,一个在高级消息队列协议(AMQP)基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
RabbitMQ支持你能想到的大多数开发语言如Java,Ruby,Python,.Net,C/C++,Erlang等等。各类语言的客户端你能够从它的官方网站上下载。html
相比较于其余一些MQ产品,RabbitMQ的一些优点仍是比较明显。web
你能够在下面一些连接中找到一些比较apache
RabbitMQ,ActiveMQ,qpid:http://bhavin.directi.com/rabbitmq-vs-apache-activemq-vs-apache-qpid/json
RabbitMQ和ZeroMQ:http://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or浏览器
还有四款消息队列大比拼:http://www.oschina.net/news/17973/message-queue-shootout安全
RabbitMQ的一些优势:服务器
安装部署方便:多线程
RabbitMQ安装方便,有详细的安装文档。并发
可靠性:负载均衡
RabbitMQ 提供了多种多样的特性让你在可靠性,可用性和性能之间做出权衡,包括持久化,发送应答,发布确认等。
强大地路由功能:
全部的消息都会经过路由器转发到各个消息队列中,RabbitMQ内建了几个经常使用的路由器,而且能够经过路由器的组合以及自定义路由器插件来完成复杂的路由功能。
Clustering:
RabbitMQ服务端能够在一个局域网中集群部署,做为一个逻辑服务器。
Federation:
Federration模式可让RabbitMQ服务器热备部署,当系统中其中一个服务器失效而没法运做时,另外一个服务器便可自动接手原失效系统所执行的工做。
Completing Consumer:
内置的竞争的消费者模式能够实现消费者的负载均衡。
管理工具:
RabbitMQ提供了一个简单易用的管理界面,让用户能够经过浏览器监控和控制你的消息队列的方方面面。
跟踪:
若是你在使用RabbitMQ过程当中出现了问题,或者获得了你不指望的结果,你能够经过打开内置Tracer功能进行跟踪,固然这时性能就有所降低。
插件系统:
提供了强大地插件系统,让用户可以方便的对MQ进行扩展。上面所说到的管理工具和Federation组件都是做为一个插件发布的。你能够选择安装或者不安装。
客户端支持:
RabbitMQ支持你能想到的大多数开发语言如Java,Ruby,Python,.Net,C/C++,Erlang等等。
强大的社区和商业支持:
强大的社区能够帮助你快速的解决关于RabbitMQ的各类问题,若是仍是不能解决你的问题,VMWare还提供商业技术支持。
RabbitMQ服务:http://www.rabbitmq.com/download.html。
(安装完RabbitMQ服务后,会在Windows服务中看到。若是没有Erlang运行环境,在安装过程当中会提醒先安装Erlang环境。http://www.erlang.org/downloads)
.net客户端类库:http://www.rabbitmq.com/dotnet.html
RabbitMQ提供了不少好用的插件,最经常使用的就是web管理工具,启动此插件。
CMD中运行命令:rabbitmq-plugins enable rabbitmq_management
注:rabbitmq-plugins 所在路径为:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\plugins
web管理工具的地址是:http://localhost:15672,初始用户名:guest 初始密码:guest
- 链接(Connection),与RabbitMQ Server创建的一个链接,由ConnectionFactory建立,每一个connection只与一个物理的Server进行链接,此链接是基于Socket进行链接的,这个能够类似的理解为像一个DB Connection。AMQP通常使用TCP连接来保证消息传输的可靠性。
- 通道 (Channel),在C#客户端里应该是叫Model,不明白为何这么取名字,其余客户端基本都叫Channel。创建在Connection基础上的一个通道,相对于Connection来讲,它是轻量级的。它就像是Hibernate里面的Session同样。Channel 主要进行相关定义,发送消息,获取消息,事务处理等。Channel能够在多线程中使用,可是在必须保证任什么时候候只有一个线程执行命令。一个Connection能够有多个Channel。客户端程序有时候会是一个多线程程序,每个线程都想要和RabbitMQ进行链接,可是又不想共享一个链接,这种需求仍是比较广泛的。由于一个Connection就是一个TCP连接,RabbitMQ在设计的时候不但愿与每个客户端保持多个TCP链接,但这确实是有些客户端的需求,因此在设计中引入了Channel的概念,每个Channel之间没有任何联系,是彻底分离的。多个Channel来共享一个Connection。
- 交换器(Exchange),它是发送消息的实体。
- 队列(Queue),这是接收消息的实体。
- 绑定器(Bind),将交换器和队列链接起来,而且封装消息的路由信息。
配置文件地址为:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默认没有rabbit.config文件,须要手工新建(默认会有rabbitmq.config.example 做为参考)。基于安全,作了两个配置,以下:
[ {rabbit, [ {loopback_users, [<<"guest">>]}, {tcp_listeners, [{"127.0.0.1", 1234}, {"10.121.1.48", 8009}]} ]} ].
loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。
tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。
设置完后,别忘记了如下操做,不然配置不起做用。
此命令要切换到路径:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
消息生产者:
class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定义一个持久化队列,若是名称相同不会重复建立 channel.QueueDeclare("MyFirstQueue", true, false, false, null); while (true) { string customStr = Console.ReadLine(); RequestMsg requestMsg = new RequestMsg(); requestMsg.Name = string.Format("Name_{0}", customStr); requestMsg.Code = string.Format("Code_{0}", customStr); string jsonStr = JsonConvert.SerializeObject(requestMsg); byte[] bytes = Encoding.UTF8.GetBytes(jsonStr); //设置消息持久化 IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "MyFirstQueue", properties, bytes); //channel.BasicPublish("", "MyFirstQueue", null, bytes); Console.WriteLine("消息已发送:" + requestMsg.ToString()); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }
class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定义一个持久化队列,若是名称相同不会重复建立 channel.QueueDeclare("MyFirstQueue", true, false, false, null); //输入1,那若是接收一个消息,可是没有应答,则客户端不会收到下一个消息 channel.BasicQos(0, 1, false); Console.WriteLine("Listening..."); //在队列上定义一个消费者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //消费队列,并设置应答模式为程序主动应答 channel.BasicConsume("MyFirstQueue", false, consumer); while (true) { //阻塞函数,获取队列中的消息 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; string str = Encoding.UTF8.GetString(bytes); RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str); Console.WriteLine("HandleMsg:" + msg.ToString()); //回复确认 channel.BasicAck(ea.DeliveryTag, false); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }
这是我的的总结,只是简单的安装和使用,积累了更好的经验在记录下来。