一、虚拟主机(virtual host)或(vhost)html
二、交换机(exchange)算法
三、队列(queue)数据库
四、绑定器(bind)编程
什么是虚拟主机?安全
一组交换机、队列和绑定器 被称为 虚拟主机(vhost)。服务器
为何要用虚拟主机?多线程
RabbitMQ server 能够说就是一个消息队列服务器实体(Broker),Broker当中能够有多个用户,而用户只能在虚拟主机的粒度进行权限控制,因此RabbitMQ中须要多个虚拟主机。每个RabbitMQ服务器都有一个默认的虚拟主机“/”。负载均衡
队列(queue)ide
队列是消息载体,每一个消息都会被投入到一个或多个队列。试图建立一个已经存在的队列,RabbitMQ会直接忽略这个请求。(接收消息的实体)。spa
把消息放进队列前,咱们还须要使用另外一个东西:交换机。
交换机(exchange)
它指定消息按什么规则,路由到哪一个队列。它能够被理解成具备路由表的路由程序。(发送消息的实体)
交换机能够存在多个,每一个交换机在本身独立的进程当中执行,所以增长多个交换机就是增长多个进程,能够充分利用服务器上的CPU核以便达到更高的效率。
交换机如何判断要把消息送到哪一个队列?这是咱们须要路由规则,也就须要绑定器了。
绑定器(bind)
它的做用就是把exchange和queue按照路由规则绑定起来。(将交换器和队列链接起来,而且封装消息的路由信息)
channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
每一个消息都有一个称为路由关键字(routingKey)的属性,exchange根据这个关键字进行消息投递,其实就是一个简单的字符串。
(绑定操做就能够理解成:exchange将具备路由关键字 “X” 的消息投递到到名为“business”的队列当中去。) 具体实践请看下文。
从而一个绑定就能够归纳为:一个基于路由键将交换机和队列链接起来的路由规则。
须要注意:由Exchange,Queue,RoutingKey三个,才能决定一个从Exchange到Queue的惟一的线路。
更多参考:http://www.ltens.com/article-6.html
链接(Connection):
与RabbitMQ Server创建的一个链接,由ConnectionFactory建立,每一个connection只与一个物理的Server进行链接,此链接是基于Socket进行链接的。AMQP通常使用TCP。
通道 (Channel):
消息通道(主要进行相关定义,发送消息,获取消息,事务处理等),在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。
Channel在.net的客户端程序里应该是叫“Model”,采用IModel CreateModel()建立的,可是其余语言的客户端都叫Channel。须要注意:一个Connection能够有多个Channel。
为何设计中引入Channel概念?
一个比较广泛的需求:客户端程序有时候会是一个多线程程序,每个线程都想要和RabbitMQ进行链接,可是又不想共享一个链接。
由于一个Connection就是一个TCP连接,RabbitMQ在设计的时候不但愿与每个客户端保持多个TCP链接,但这确实是有些客户端的需求,每个Channel之间没有任何联系,是彻底分离的。
创建在Connection基础上的一个Channel,相对于Connection来讲,它是轻量级的。Channel能够在多线程中使用,可是在必须保证任什么时候候只有一个线程执行命令。
有4种:direct【默认的类型】,fanout,topic,headers。其中headers不经常使用,本篇不作介绍,其余三种类型,会作详细介绍。
Exchange与队列进行绑定后,消息根据exchang的类型,按照不一样的绑定规则分发消息到消息队列中,能够是一个消息被分发给多个消息队列,也能够是一个消息分发到一个消息队列。具体请看下文。
介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。
一、Fanout: 广播模式,会忽略路由键Routingkey,将消息广播给绑定到该交换机的全部队列。 不论消息的路由关键字是什么,这条消息都会被路由到全部与该交换器绑定的队列中。
广播式交换器类型的工做方式以下:
不使用任何参数将消息队列与交换器绑定在一块儿。
发布者(直接式交换器类型描述中的producer变成了publisher,已经隐含了二种交换器类型的区别)向交换器发送一条消息。 消息被无条件的传递到全部和这个交换器绑定的消息队列中。
二、Direct: 根据路由键和交换器来找队列的,对消息路径进行全文匹配。消息路由键 "sunshine" 只能匹配 "sunshine" 绑定,不匹配 "sunshine.warm" 这类绑定。
经过精确匹配消息的路由关键字,将消息路由到零个或者多个队列中,绑定关键字用来将队列和交换器绑定到一块儿。这让咱们能够构建经典的点对点队列消息传输模型,不过和任何已定义的交换器类型同样,当消息的路由关键字与多个绑定关键字匹配时,消息可能会被发送到多个队列中。
在direct模式下还能够实现多路绑定,即一个exchange和多个queue绑定时,具备一样的bindkey,以下图:
三、Topic: 主题模式,处理路由键,按模式匹配路由键。
模式符号:
"#" 表示一个或多个单词,"*" 仅匹配一个单词。
如 "wood.#" 可匹配 "wood.palm.redwood",但 "wood.*" 只匹配 "wood.deadwood"。
主题式交换器类型提供了这样的路由机制:经过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中。这种路由器类型能够被用来支持经典的发布/订阅消息传输模型——使用主题名字空间做为消息寻址模式,将消息传递给那些部分或者所有匹配主题模式的多个消费者。
主题交换器类型的工做方式以下:
绑定关键字用零个或多个标记构成,每个标记之间用“.”字符分隔。绑定关键字必须用这种形式明确说明,并支持通配符:“*”匹配一个词组,“#”零个或多个词组。
所以绑定关键字“*.dask.#”匹配路由关键字“class.dask”和“eur.dask.tab”,可是不匹配“dask.rho”。
问题及方案描述
1.当有多个消费者同时收取消息,且每一个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程当中可能会出现一些意外,好比消息接收到一半的时候,一个消费者死掉了。
这种状况要使用消息接收确认机制,能够执行上次宕机的消费者没有完成的事情。
2.在默认状况下,咱们程序建立的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次建立的队列、消息都不会保存。
这种状况可使用RabbitMQ提供的消息队列的持久化机制。
相关理论描述
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我我的以为大多数开发人员都会选择持久化。
队列和交换机有一个建立时候指定的标志durable。durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列当中的消息会在重启后恢复。
消息队列持久化包括3个部分:
一、exchange持久化,在声明时指定durable => true
二、queue持久化,在声明时指定durable => true
三、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)
若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的。若是exchange和queue二者之间有一个持久化,一个非持久化,就不容许创建绑定。
注意:一旦建立了队列和交换机,就不能修改其标志了。例如,若是建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。
生产者
class Producter { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); string message = "Eric is very handsome"; var body = Encoding.UTF8.GetBytes(message); //将队列设置为持久化以后,还须要将消息也设为可持久化的 var props = channel.CreateBasicProperties(); props.SetPersistent(true); channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body); Console.WriteLine("Producter Sent: {0}", message); Console.ReadKey(); } } }
注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")
程序运行结果:
消费者
class Recevice { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); //NoAck:true 告诉RabbitMQ当即从队列中删除消息,另外一个很是受欢迎的方式是从队列中删除已经确认接收的消息,能够经过单独调用BasicAck 进行确认: //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); var msgContent = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("The received content:"+msgContent); channel.BasicAck(msgResponse.DeliveryTag, multiple: false); //使用BasicAck方式来告之是否从队列中移除该条消息 //须要额外注意,好比从队列中获取消息并用它来操做数据库或日志文件时,若是出现操做失败时,则该条消息应该保留在队列中,只到操做成功时才从队列中移除。 Console.ReadKey(); } } }
接受消息还有一种方法,就是经过基于推送的事件订阅。可使用内置的 QueueingBasicConsumer 提供简化的编程模型,容许在共享队列上阻塞,直到收到一条消息。
var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
程序运行结果:
消费者消息的确认
一、消息队列的消费
Note:若是一个消息队列中有大量消息等待操做时,咱们能够用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。
二、为保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供了ack应答机制,来实现这一功能。
ack应答有两种方式:一、自动应答,二、手动应答。具体实现以下。
public static void Consumer() { try { var qName = "lhtest1"; var exchangeName = "fanoutchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "*"; var uri = new Uri("amqp://xxxx:5672/"); var factory = new ConnectionFactory { UserName = "123", Password = "123", RequestedHeartbeat = 0, Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType); channel.QueueDeclare(qName, true, false, false, null); channel.QueueBind(qName, exchangeName, routingKey); //定义这个队列的消费者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //false为手动应答,true为自动应答 channel.BasicConsume(qName, false, consumer); while (true) { BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; var messageStr = Encoding.UTF8.GetString(bytes); var message = DoJson.JsonToModel<QMessage>(messageStr); Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title); //若是是自动应答,下下面这句代码不用写啦。 if ((Convert.ToInt32(message.Title) % 2) == 1) { channel.BasicAck(ea.DeliveryTag, false); } } } } } catch (Exception ex) { Console.WriteLine(ex.Message); } }