一个winform带你玩转rabbitMQ

源码已放出 https://github.com/dubing/MaoyaRabbithtml

本章分3部分node

1、安装部署初探git

2、进阶github

3、api相关json


安装 部署 初探

先上图windows


一. 安装部署api

  下载 rabbitMQ :http://www.rabbitmq.com/download.html安全

  安装rabbitmq须要erlang,下载erlang:http://www.erlang.org/download.html服务器

  按照官网按照步骤,例如windows http://www.rabbitmq.com/install-windows.html app

  安装完rabbitMQ能够再启动插件扩展,其中包含了一个管理后台

  

  最新版本的后台地址为 http://localhost:15672/ 

  

  用户名和密码都为guest,输入完成进入主菜单

  

  功能很丰富,能够查看当前服务器的交换机,队列,消息,链接,会话等得使用状况。

  基本上到这里服务器的安装部署环节算是ok,很简单。


 

二.  简介

  要了解rabbitMQ 首先要了解AMQP协议 百科上给的很详细 http://baike.baidu.com/view/4023136.htm?fr=aladdin

  AMQP 有四个很是重要的概念:虚拟机(virtual host),通道(exchange),队列(queue)和绑定(binding)。

  虚拟机: 一般是应用的外在边界,咱们能够为不一样的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
  交换机: 从链接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
  队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
  绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。

  这个概念很重要 否则在学习rabbitmq的地方会碰到不少困难。想要进阶学习的能够参考 https://www.rabbitmq.com/tutorials/amqp-concepts.html

  借用官方一个图来阐述AMQP

  

  RabbitMQ是一个消息代理。它的核心原理很是简单:接收和发送消息。

  你能够把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

  对于rabbitMQ自己的特色 参考官网 http://www.rabbitmq.com/features.html

  一、可靠性(Reliability)
  RabbitMQ提供不少特性供咱们能够在性能和可靠性做出折中的选择,包括持久化、发送确认、发布者确认和高可用性等。
  二、弹性选路(Flexible Routing)
  消息在到达队列前经过交换(exchanges)来被选路。RabbitMQ为典型的选路逻辑设计了几个内置的交换类型。对于更加复杂的选路,咱们能够将exchanges绑定在一块儿或者写属于本身的exchange类型插件。
  三、集群化(Clustering)
  在一个局域网内的几个RabbitMQ服务器能够集群起来,组成一个逻辑的代理人。
  四、联盟(Federation)
  对于那些须要比集群更加松散和非可靠链接的服务器来讲,RabbitMQ提供一个联盟模型(Federation Model)
  五、高可用队列(High Available Queue)
  能够在一个集群里的几个机器里对队列作镜像,确保即时发生了硬件失效,你的消息也是安全的。
  六、多客户端(Many Clients)
  有各类语言的RabbitMQ客户端
  七、管理UI(Management UI)
  RabbitMQ提供一个易用的管理UI来监控和控制消息代理人的各个方面。
  八、跟踪(Tracing)
  若是你的消息系统行为异常,RabbitMQ提供跟踪支持来找出错误的根源。
  九、插件系统(Plugin System)
  RabbitMQ提供各类方式的插件扩展,咱们能够实现本身的插件。

  使用任务队列一个优势是可以轻易地并行处理任务。当处理大量积压的任务,只要增长工做队列,经过这个方式,可以实现轻易的缩放。


 

三. 初探

  文中的winform所采起的client为官方的.net版本 https://github.com/rabbitmq/rabbitmq-dotnet-client

  首先是Connection和Channel的概念

  Connection 创建与rabbitmq server的一个链接,由ConnectionFactory建立,Channel创建在connection基础上的一个频道,相对于connection来讲,它是轻量级的。能够理解成一次会话。

  代码示例 本机环境

                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
			//do something
                    }
                }

  exchange经常使用有三种类型:

  Direct :处理路由键。须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键彻底匹配。这是一个完整的匹配。
  Fanout :不处理路由键。你只须要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。
  Topic : 将路由键和某模式进行匹配。此时队列须要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词。

  还有一种多重属性的类型headers,咱们在下一章节讨论。

  咱们用winform分别形成三种类型的exchange来实际体验一下

  

  这里所谓的限定exchange是在咱们安装rabbitmq server的时候自动生成的一些 咱们的测试不使用这些exchange。

  而后咱们新建3个Queue,这里咱们会发现一个有趣的现象,rabbitmq server对于新生成的队列都会默认绑定在一个名称为“”的默认exchange上。

  先试试direct类型,下面咱们分别把Q1,Q2,Q3根据路由key为空,k1,k.#绑定在dEx上(direct exchange)。
    

  而后咱们根据路由key为空,k,k1,k2,k3来发送消息m1,m2,m3,m4,m5
    

  再用3个队列接收消息试一下结果
  

  由于发送确认标记ack,因此队列上读取过的消息会被删除,为了进一步认证,我在结尾又添加了一个routingkey为k.#的消息(对应绑定Q3),由图可见direct 模式下队列之收取他们彻底对应的routingkey消息。

  下面咱们再试一下fanout类型,把Q1,Q2,Q3根据路由key为空,k1,k.#绑定在fEx上(fanout exchange)。

  
  同上步骤创建绑定关系

  

  生产消息,而后看下队列接受消息的状况
  

  效果很明显,fanout为广播模式。

  再试试topic类型 把Q1,Q2,Q3根据路由key为空,k1,k.#绑定在tEx上(topic exchange)。

  

  推送消息
  

  接收消息
  
  经过3种模式 3个队列的消息读取 你们应该了解了这3中模式的区别。


进阶

一.  exchange属性

  Type

  前一章咱们说了exchange的类型分为fanout,direct,topic.还有一种不经常使用的headers。
  headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,能够定义成成字典等。发送者在发送的时候定义一些键值对,接收者也能够再绑定时候传入一些键值对,二者匹配的话,则对应的队列就能够收到消息。匹配有两种方式all和any。这两种方式是在接收端必需要用键值"x-mactch"来定义。all表明定义的多个键值对都要知足,而any则代码只要知足一个就能够了。以前的几种exchange的routingKey都须要要字符串形式的,而headers exchange则没有这个要求,由于键值对的值能够是任何类型
  举个例子,发送端定义2个键值{k1,1},{k2,2},接收端绑定队列的时候定义{"x-match", "any"},那么接收端的键值属性里只要存在{k1,1}或{k2,2}均可以获取到消息。
  这样的类型扩展的程度很大,适合很是复杂的业务场景。

  Durability

  持久性,这是exchange的可选属性,若是你Durability设置为false,那些当前会话结束的时候,该exchange也会被销毁。 
  新建一个transient exchange 
  
  关闭当前链接再查看一下
  

  

  刚才咱们新建的transient已经销毁了。

  Auto delete

  当没有队列或者其余exchange绑定到此exchange的时候,该exchange被销毁。这个很简单就不示例了。

  Internal (比较简单 也不展现了)

  表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

  PS: 没法声明2个名称相同 可是类型却不一样的exchange

  


二.  Queue属性  

  Durability 和exchange相同,未持久化的队列,服务重启后销毁。

  Auto delete 当没有消费者链接到该队列的时候,队列自动销毁。

  Exclusive 使队列成为私有队列,只有当前应用程序可用,当你须要限制队列只有一个消费者,这是颇有用的。

  扩展属性以下对应源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的参数

  Message TTL 当一个消息被推送在该队列的时候 能够存在的时间 单位为ms,(对应扩展参数argument "x-message-ttl" )

  Auto expire 在队列自动删除以前能够保留多长时间(对应扩展参数argument "x-expires")

  Max length 一个队列能够容纳的已准备消息的数量(对应扩展参数argument "x-max-length")

  ... 更多参考 http://www.rabbitmq.com/extensions.html

  ps:一旦建立了队列和交换机,就不能修改其标志了。例如,若是建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。


三.  Message属性

  Durability 

  消息的持久在代码中设置的方法与exchange和queue不一样,有2种方法

  1.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.SetPersistent(true);
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  2.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.DeliveryMode = 2;
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  contentType: 标识消息内容的MIME,例如JSON用application/json

  replayTo: 标识回调的queue的地址

  correlationId:用于request和response的关联,确保消息的请求和响应的同一性

  Message的2种状态:

  Ready

  此状态的消息存在于队列中待处理。

  Unacknowledged

  此状态的消息表示已经在处理未确认。

  说到Unacknowledged,这里须要了解一个ack的概念。当Consumer接收到消息、处理任务完成以后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成。RabbitMQ会一直等处处理某个消息的Consumer的连接失去以后,才肯定这个消息没有正确处理,从而RabbitMQ重发这个消息。
  Message acknowledgment是默认关闭的。初始化Consumer时有个noAck参数,若是设置为true,这个Consumer在收到消息以后会立刻返回ack。

  string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)

  通常来讲,经常使用的场景noack通常就是设置成true,可是对于风险要求比较高的项目,例如支付。对于每一条消息咱们都须要保证他的完整性和正确性。就须要获取消息后确认执行完正确的业务逻辑后再主动返回一个ack给server。能够经过rabbitmqctl list_queues name message_rady message_unacknowleded 命令来查看队列中的消息状况,也能够经过后台管理界面。

  咱们先hold住一条消息

  

  而后咱们再关闭连接或者重启服务

  
  数据仍是完整的。 

  ps:message的消费还分为consume和baseget 下面讲到集群的时候再介绍。


四.  binding相关

  若是你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。相似的,若是删除了某个队列或交换机(不管是否是 durable),依赖它的绑定都会自动删除。

  在声明一个队列的同时,server会默认让此队列绑定在默认的exchange上,这个exchange的名称为空。

   


 五.  发布订阅

  咱们上一章的demo中实际上已经使用了发布订阅模式。

  RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只须要把消息发送给一个exchange。exchange很是简单,它一边从发布者方接收消息,一边把消息推入队列。exchange必须知道如何处理它接收到的消息,是应该推送到指定的队列仍是是多个队列,或者是直接忽略消息。这些规则是经过exchange type来定义的。

  

  发布订阅其实很简单,例如上章我所示例,假设咱们一开始没有任何消息,如今有一个生产者P1,他是一个天气预报播放者。而后咱们有2个消费者来订阅他的消息。
  P1经过广播类型的交换机fEx来发布他的天气消息,c1,c2分别创建一个队列为Q1,Q2. 而且订阅P1的fEx.

  基本能够如图所示
  
  咱们P1利用fEx生成一条消息的时候,c1,c2经过Q1,Q2均可以获取到p1所发布的消息

  咱们发布3条消息
  
  查看队列状况
  Q1:
  
  Q2:

  

  Q1,Q2都拿到了广播的消息,至于C1,C2如何消费这些消息,互相之间彻底没有干扰。

  ps:简单一句话 发布订阅中发布者所产生的消息经过exchange对全部绑定他的队列队形消息推送,每一个队列获取绑定所对应的消息


六.  WorkQueue (可用于消费者集群)

  区分于发布订阅,消费者集群主要解决横向服务器扩展问题,若是一个队列积压太多,如何均与的让不一样的消费者来承担。

  

  默认来讲,RabbitMQ会按顺序得把消息发送给每一个消费者(consumer)。平均每一个消费者都会收到同等数量得消息。这种发送消息得方式叫作——轮询(round-robin)。

  咱们开3个程序,1个生产 2个消费。

  如图所示绑定关系以下

  

  2个消费者用一样的程序,这里记录进程pid以区分,实际项目中能够用不一样服务器来区分

  

   启动消息消费,使消费者处理work状态

  

  而后咱们不停的经过生产者这发布消息

  

  而后咱们看下2个消费者的消费状况

  1.

  

  2.
  

  3.
  

  4.
  

  5.
  
  

  默认地,RabbitMQ会逐一地向下一个Consumer发放消息,每个Consumer会获得数目相同的消息。文中所示之因此是按照1条一条的轮询,是由于程序中控制了一个队列单次消费的数量。

  void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)


API CommandLine 以及其余功能

RabbitMQ API

  RabbitMQ Server提供了丰富的http api。

  举个列子

  

  须要HTTP基自己份验证。默认的用户名/密码为guest/guest。

  这些返回值得意义我从官网搬来解释,为了不翻译的问题致使你们理解的偏差这里直接给出原文

cluster_name The name of the entire cluster, as set with rabbitmqctl set_cluster_name.
erlang_full_version A string with extended detail about the Erlang VM and how it was compiled, for the node connected to.
erlang_version A string with the Erlang version of the node connected to. As clusters should all run the same version this can be taken as representing the cluster.
exchange_types A list of all exchange types available.
listeners All (non-HTTP) network listeners for all nodes in the cluster. (See contexts in /api/nodes for HTTP).
management_version Version of the management plugin in use.
message_stats A message_stats object for everything the user can see - for all vhosts regardless of permissions in the case of monitoring and administrator users, and for all vhosts the user has access to for other users.
node The name of the cluster node this management plugin instance is running on.
object_totals An object containing global counts of all connections, channels, exchanges, queues and consumers, subject to the same visibility rules as for message_stats.
queue_totals An object containing sums of the messagesmessages_ready and messages_unacknowledged fields for all queues, again subject to the same visibility rules as for message_stats.
rabbitmq_version Version of RabbitMQ on the node which processed this request.
statistics_db_node Name of the cluster node hosting the management statistics database.
statistics_level Whether the node is running fine or coarse statistics.

  又或者经过api查询虚拟主机
  
  许多api的URI须要一个虚拟主机路径的一部分的名字,由于名字只有惟一在一个虚拟主机识别物体。做为默认的虚拟主机称为“/”,这​​将须要被编码为“%2F”。

  在个人demo程序中对应的api功能能够经过这里的功能来实现

  

  其更丰富的功能能够参考官网说明文档 http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html

  以及 http://hg.rabbitmq.com/rabbitmq-management/raw-file/rabbitmq_v3_3_5/priv/www/api/index.html

  通常来讲咱们经常使用的我在应用程序中已经给出 例如查看全部队列等

  


 RabbitMQ CommandLine

  除了丰富的http api,rabbitmq server天然也有其很全面命令行。

  例如查询全部exchange。

  

  查询全部队列以及他们包含的消息数目

   

  rabbitmqctl更多的命令说明参考 http://www.rabbitmq.com/man/rabbitmqctl.1.man.html


Message的BasicGet于consume的区别

   consume的功能上一张介绍过,basicget更偏向于咱们平时用过的其余类型的MessageQueue,它就是最基本的接受消息,consume的消费针对basicget来讲属于一个长链接于短链接的区别。

消费者关系一旦肯定,基本上默认它就是在侦听通道的消息是否在生产。而basicget则是由客户端手动来控制。

  在demo中在下图所示处区分

  

  若是你选择了消费消息,那么基本上代码层面是这样来完成的

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicQos(0, 1, false);
                    channel.BasicConsume(queue.name, rbAckTrue.Checked, consumer);
                    while (true)
                    {
                        var e = consumer.Queue.Dequeue();
                        MessageBox.Show(string.Format("队列{0}获取消息{1},线程id为{2}", queue.name, Encoding.ASCII.GetString(e.Body), Process.GetCurrentProcess().Id));
                        Thread.Sleep(1000);
                    }

本篇先到此,但愿对你们有帮助  

相关文章
相关标签/搜索