转载自:http://geek.csdn.net/news/detail/74184html
因为以前作的项目中须要在多个节点之间可靠地通讯,因此废弃了以前使用的Redis pub/sub(由于集群有单点问题,且有诸多限制),改用了RabbitMQ。使用期间获得很多收获,也踩了很多坑,因此在此分享下心得。java
RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。node
当RabbitMQ退出时,默认会将消息和队列都清除,因此须要在第一次声明队列和发送消息时指定其持久化属性为true,这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。git
durable=true channel.queueDeclare("task_queue", durable, false, false, null);//队列 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//消息
注:当声明的队列已经存在时,尝试从新定义它的durable是不生效的。
客户端接收消息的模式默认是自动应答,可是经过设置autoAck为false可让客户端主动应答消息。当客户端拒绝此消息或者未应答便断开链接时,就会使得此消息从新入队(在版本2.7.0之前是到从新加入到队尾,2.7.0及之后是保留消息在队列中的原来位置)。github
autoAck = false; requeue = true; channel.basicConsume(queue, autoAck, callback); channel.basicAck();//应答 channel.basicReject(deliveryTag, requeue);//拒绝 channel.basicRecover(requeue);//恢复
默认状况下,发送端不关注发出去的消息是否被消费掉了。可设置channel为confirm模式,全部发送的消息都会被确认一次,用户能够自行根据server发回的确认消息查看状态。详细介绍见:confirmsdocker
channel.confirmSelect(); // 进入confirm模式 do publish messages... // 每一个消息都会被编号,从1开始 channel.getNextPublishSeqNo() // 查看下一个要发送的消息的序号 channel.waitForConfirms(); // 等待全部消息发送并确认
channel.txSelect(); try { do something... channel.txCommit(); } catch (e){ channel.txRollback(); }
相比于路由和绑定,能够视为是共享于全部的节点的,消息队列默认只存在于第一次声明它的节点上,这样一旦这个节点挂了,这个队列中未处理的消息就没有了。 幸亏,RabbitMQ提供了将它备份到其余节点的机制,任什么时候候都有一个master负责处理请求,其余slaves负责备份,当master挂掉,会将最先建立的那个slave提高为master。数据库
命令:rabbitmqctl set_policy ha-all “^ha\.” ‘{“ha-mode”:”all”}’
:设置全部以’ha’开头的queue在全部节点上拥有备份。详细语法点这里;也能够在界面上配置。缓存
注:因为exclusive类型的队列会在client和server链接断开时被删掉,因此对它设置持久化属性和备份都是没有意义的。
直接上图好了:ruby
一个集群中多个节点共享一份.erlang.cookie文件;如果没有启用RABBITMQ_USE_LONGNAME,须要在每一个节点的hosts文件中指定其余节点的地址,否则会找不到其余集群中的节点。服务器
RabbitMQ集群对于网络分区的处理和忍受能力不太好,推荐使用federation或者shovel插件去解决。federation详见高级->Federation。可是,状况已经发生了,怎么去解决呢?放心,仍是有办法恢复的。当网络断断续续时,会使得节点之间的通讯断掉,进而形成集群被分隔开的状况。这样,每一个小集群以后便只处理各自本地的链接和消息,从而致使数据不一样步。当从新恢复网络链接时,它们彼此都认为是对方挂了-_-||,即可以判断出有网络分区出现了。可是RabbitMQ默认是忽略掉不处理的,形成两个节点继续各自为政(路由,绑定关系,队列等能够独立地建立删除,甚至主备队列也会每一方拥有本身的master)。能够更改配置使得链接恢复时,会根据配置自动恢复。
ignore:默认,不作任何处理
pause-minority:断开链接时,判断当前节点是否属于少数派(节点数少于或者等于一半),若是是,则暂停直到恢复链接。
{pause_if_all_down, [nodes], ignore | autoheal}:断开链接时,判断当前集群中节点是否有节点在nodes中,若是有,则继续运行,不然暂停直到恢复链接。这种策略下,当恢复链接时,可能会有多个分区存活,因此,最后一个参数决定它们怎么合并。
autoheal:当恢复链接时,选择客户端链接数最多的节点状态为主,重启其余节点。
配置:**【详见下文:集群配置】
集群版本的实现:详见我本身写的一个例子rabbitmq-server-cluster
最后附一张网上截取的测试结果:
若是有兴趣简单了解下RabbitMQ,能够继续往下看~
RabbitMQ官方实现了不少热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(10000);
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
生产者:
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
消费者:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。
若是设置了autoAck=false,那么能够实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。须要在消费者中加上:
int prefetchCount = 1; channel.basicQos(prefetchCount);
其余同上。
生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
消费者同上。
生产者:
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, routingKey); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
消费者同上。
*能够表示一个单词
#能够表示一个或多个单词
生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
消费者同上。
其实就是一对一模式的一种用法:
首先,客户端发送一条消息到服务端声明的队列,消息属性中包含reply_to
和correlation_id
- reply_to 是客户端建立的消息的队列,用来接收远程调用结果 - correlation_id 是消息的标识,服务端回应的消息属性中会带上以便知道是哪条消息的结果。
而后,服务端接收到消息,处理,并返回一条结果到reply_to队列中,最终,客户端接收到返回消息,继续向下处理。
支持各大主流操做系统,这里以Unix为例介绍下经常使用配置和命令:
因为RabbitMQ是依赖于Erlang的,因此得首先安装最近版本的Erlang。
单点的安装比较简单,下载解压便可。
下载地址:http://www.rabbitmq.com/download.html
(注:若启动失败了,能够在启动日志中查看到具体的错误信息。)
$RABBITMQ_HOME/sbin/rabbitmq-server:启动脚本,会打印出配置文件,插件,集群等信息;加上-detached为后台启动; /sbin/rabbitmqctl status:查看启动状态 /sbin/rabbitmqctl add_user admin admin:添加新用户admin,密码admin;默认只有一个guest用户,但只限本机访问。 /sbin/rabbitmqctl set_user_tags admin administrator:将admin设置为管理员权限 /sbin/rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 赋予admin全部权限 /sbin/rabbitmqctl stop:关闭
集群节点共享全部的状态和数据,如:用户、路由、绑定等信息(队列有点特殊,虽然从全部节点均可达,可是只存在于第一次声明它的那个节点上,解决方案——详见上文:消息队列的高可用);每一个节点均可以接收链接,处理数据。
集群节点有两种,disc:默认,信息存在本地数据库;ram:加入集群时,添加–ram参数,信息存在内存,可提升性能。
更多详细的配置见:配置
rabbitmqctl stop_app rabbitmqctl join_cluster [--ram] nodename@hostname:将当前节点加入到集群中;默认是以disc节点加入集群,加上--ram为ram节点。 rabbitmqctl start_app rabbitmqctl cluster_status:查看集群状态
(注:若是加入集群失败,可先查看)
RabbitMQ原生支持AMQP 0-9-1并扩展实现了了一些经常使用的功能:AMQP 0-9-1
包含三层:
注:其余协议的支持见:RabbitMQ支持的协议
管理界面(神器)
启动后,执行rabbitmq-plugins enable rabbitmq_management
→访问http://localhost:15672→查看节点状态,队列信息等等,甚至能够动态配置消息队列的主备策略,以下图:
Federation
启用Federation插件,使得不一样集群的节点之间能够传递消息,从而模拟出相似集群的效果。这样能够有几点好处:
几个概念:
rabbitmqctl set_parameter federation-upstream my-upstream '{"uri":"amqp://server-name","expires":3600000}' 定义一个my-upstream
rabbitmqctl set_policy --apply-to exchanges federate-me "^amq\." '{"federation-upstream-set":"all"}'
rabbitmq-plugins enable rabbitmq_federation
若是启用了管理界面,能够添加:
rabbitmq-plugins enable rabbitmq_federation_management
这样就能够在界面配置Upstream和Policy了。
注:若是在一个集群中使用federation,须要该集群每一个节点都启用Federation插件;
注:更多插件请见:插件。