rabbitMq的接触

以前公司作即时通信用到openfire,在后面的文章可能会介绍,这里介绍一下有接触到的消息队列,在消息队列的服务器有几个主流的:RabbitMq,ActiveMq,ZeroMq。至于在选择上,就不作介绍了。在网络上面也比较多的文章点击打开连接 java

  介绍一下有使用到RabbitMq,因为使用的时间很少,只是作到一些基本的使用。介绍一下RabbitMQ,用erlang语言开发,这个语言没接触过,上网搜了一下好像说在大并发方面有很好的效果。因此用这个语言开发的服务器应该在并发方面有比较好的体现。 服务器

接着分点介绍一下:1.概念介绍(复制): 网络

1.1交换机(exchange):

1. 接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout 并发

direct:转发消息到routigKey指定的队列 负载均衡

topic:按规则转发消息(最灵活) 学习

headers:(这个尚未接触到) fetch

fanout:转发消息到全部绑定队列 编码

2. 若是没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。 spa

3. 一个交换机能够绑定多个队列,一个队列能够被多个交换机绑定。 .net

4. topic类型交换器经过模式匹配分析消息的routing-key属性。它将routing-keybinding-key的字符串切分红单词。这些单词之间用点隔开。它一样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key*.stock.#匹配routing keyusd.stcokeur.stock.db,可是不匹配stock.nana

还有一些其余的交换器类型,如headerfailoversystem等,如今在当前的RabbitMQ版本中均未实现。

5. 由于交换器是命名实体,声明一个已经存在的交换器,可是试图赋予不一样类型是会致使错误。客户端须要删除这个已经存在的交换器,而后从新声明而且赋予新的类型。

6. 交换器的属性:

持久性:若是启用,交换器将会在server重启前都有效。

自动删除:若是启用,那么交换器将会在其绑定的队列都被删除掉以后自动删除掉自身。

惰性:若是没有声明交换器,那么在执行到使用的时候会致使异常,并不会主动声明。


1.2      队列(queue):

1. 队列是RabbitMQ内部对象,存储消息。相同属性的queue能够重复定义。

2. 临时队列。channel.queueDeclare(),有时不须要指定队列的名字,并但愿断开链接时删除队列。

3. 队列的属性:

持久性:若是启用,队列将会在server重启前都有效。

自动删除:若是启用,那么队列将会在全部的消费者中止使用以后自动删除掉自身。

惰性:若是没有声明队列,那么在执行到使用的时候会致使异常,并不会主动声明。

排他性:若是启用,队列只能被声明它的消费者使用。

这些性质能够用来建立例如排他和自删除的transient或者私有队列。这种队列将会在全部连接到它的客户端断开链接以后被自动删除掉。它们只是短暂地链接到server,可是能够用于实现例如RPC或者在AMQ上的对等通讯。4. RPC的使用是这样的:RPC客户端声明一个回复队列,惟一命名(例如用UUID),而且是自删除和排他的。而后它发送请求给一些交换器,在消息的reply-to字段中包含了以前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to做为routing key(默认绑定器会绑定全部的队列到默认交换器,名称为“amp.交换器类型名”)发送到默认交换器。注意这仅仅是惯例而已,能够根据和RPC服务器的约定,它能够解释消息的任何属性(甚至数据体)来决定回复给谁。

1.3      消息传递:

1. 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,能够动态的增长消费者以提升消息的处理能力。

2. 为了实现负载均衡,能够在消费者端通知RabbitMQ,一个消息处理完以后才会接受下一个消息。

channel.basic_qos(prefetch_count=1)

注意:要防止若是全部的消费者都在处理中,则队列中的消息会累积的状况。

3. 消息有14个属性,最经常使用的几种:

deliveryMode:持久化属性

contentType:编码

replyTo:指定一个回调队列

correlationId:消息id

实例代码:

4. 消息生产者能够选择是否在消息被发送到交换器而且还未投递到队列(没有绑定器存在)和/或没有消费者可以当即处理的时候获得通知。经过设置消息的mandatory/immediate属性为真,这些投递保障机制的能力获得了强化。

5. 此外,一个生产者能够设置消息的persistent属性为真。这样一来,server将会尝试将这些消息存储在一个稳定的位置,直到server崩溃。固然,这些消息确定不会被投递到非持久的队列中。

 

1.4      高可用性(HA):

1. 消息ACK,通知RabbitMQ消息已被处理,能够从内存删除。若是消费者因宕机或连接失败等缘由没有发送ACK(不一样于ActiveMQ,在RabbitMQ里,消息没有过时的概念),则RabbitMQ会将消息从新发送给其余监听在队列的下一个消费者。

channel.basicConsume(queuename, noAck=false, consumer);

2. 消息和队列的持久化。定义队列时能够指定队列的持久化属性(问:持久化队列如何删除?)

channel.queueDeclare(queuename, durable=true, false, false, null);

发送消息时能够指定消息持久化属性:

channel.basicPublish(exchangeName, routingKey,

            MessageProperties.PERSISTENT_TEXT_PLAIN,

            message.getBytes());

这样,即便RabbitMQ服务器重启,也不会丢失队列和消息。

3. publisher confirms

4. master/slave机制,配合Mirrored Queue,这种状况下,publisher会正常发送消息和接收消息的confirm,但对于subscriber来讲,须要接收Consumer Cancellation Notifications来获得主节点失败的通知,而后re-consume from the queue,此时要求client有处理重复消息的能力。注意:若是queue在一个新加入的节点上增长了一个slave,此时slave上没有此前queue的信息(目前尚未同步机制)。

(经过命令行或管理插件能够查看哪一个slave是同步的:

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

    当一个slave从新加入mirrored-queue时,若是queuedurable的,则会被清空。


2.最简单的使用java代码:

生产者:经过定义队列的名称,以及一些属性(是否持久化等),发送消息便可。


消费者:定义队列,获取数据便可


因为这个也是以前工做须要才学习了一下,有作了笔记,也主要是看来网络别人的博客作的,发现一篇可能介绍不完,下篇经过java代码来介绍几种消息队列里面的状况。
相关文章
相关标签/搜索