RabbitMQ 教程

Producer和Consumer都应该去建立这个queue,尽管只有一个地方的建立是真正起做用的算法

Connection:每一个客户端与MQ都是经过一个TCP长连接通讯的。安全

Channels: 虚拟链接。它创建在上述的TCP链接中。app

Direct exchange:一对一匹配fetch

Fanout exchange:一对多广播设计

Topic exchange: 对key进行模式匹配队列

 

virtual host:路由

每一个virtual host本质上都是一个RabbitMQ Server,拥有它本身的queue,exchagne,和bings rule等等。这保证了你能够在多个不一样的application中使用RabbitMQ。用于队列安全隔离。hash

 

消息删除方式:it

默认状况下,RabbitMQ 会顺序的分发每一个Message。当每一个收到ack后,会将该Message删除,而后将下一个Message分发到下一个Consumer。这种分发方式叫作round-robin--循环分发。io

channel上的no_ack=true/false,设置消息删除方式,true是消费即删除,false是返回ack后删除。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不单单是被Consumer收到,那么咱们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够去安全的删除它了。

若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的状况下数据也不会丢失。

这里并无用到超时机制。RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来作数据处理。

默认状况下,消息确认是打开的(enabled)。

 

消息持久化:

为了保证在RabbitMQ退出或者crash了数据仍没有丢失,须要将queue和Message都要持久化。

queue的持久化须要在声明时指定durable=True

  1. 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
  2. 持久化queue,能够防止RabbitMQ Server 重启或者crash引发的数据丢失。
  3. 持久化Message,理由同上。

 

公平分发:

默认状况round-robin循环分发会让处理能强的Consumer处于闲置状态,处理能力差的Consumer处于忙碌状态。

经过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

注意,这种方法可能会致使queue满。固然,这种状况下你可能须要添加更多的Consumer,或者建立更多的virtualHost来细化你的设计。

 

publish / subscribe:经过exchange的广播模式fanout实现

临时队列temporary queue,

声明queue时不指定名字,那么RabbitMQ会随机为咱们选择这个名字,经过result.method.queue 能够取得queue的名字。

Consumer链接MQ时建立一个临时队列,断开链接时删除此队列,能够加个exclusive的参数

result = channel.queue_declare(exclusive=True)

将没有名字的queue绑定在fanout类型的exchange,达到临时订阅

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

 

routing_key:

Direct exchange的路由算法很是简单:经过binding key的彻底匹配

Topic exchange的路由经过模糊匹配

  • * (星号) 表明任意 一个单词
  • # (hash) 0个或者多个单词

fanout的exchange来讲,这个参数是被忽略的,由于是广播。

相关文章
相关标签/搜索