ActiveMQ Broker提供基于LevelDB复制的方式提供高可用服务,可是对负载均衡作的很弱,只支持Static的服务器之间转发。目前比较流行的消息分片居然不支持。可是消费者的负载均衡和高可用仍是比较完善的。另外说一下,生产者的高可用和负载均衡,通常是靠外围程序控制。好比,基于Tomcat的web程序做为生产者,那么这个web程序的高可用,须要靠tomcat等外围程序。因此通常所说的高可用,主要指Broker和Consumer。java
下面介绍一下几个经常使用的消费者策略。web
Exclusive Consumer:apache
用于处理Queue的高可用。若是同时使用多个消费者从同一个Queue消费消息,那么消息的顺序性将得不到保证。这时候可使用Exclusive Consumer。使用Exclusive Consumer能够保证只有一个消费者在消费这个Queue,其余的消费者处于等待状态。一旦处理消费状态的消费者不可用,系统会自动使用失效转移机制,选择到一个新的消费者继续消费。
tomcat
使用以下:服务器
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
只须要在Destination上增长consumer.exclusive=true参数便可。网络
还能够给消费者设置优先级,用于针对网络和服务器资源不一样的状况。session
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");
Message Groups:负载均衡
ActiveMQ 5.3版本新增长了这个功能。Message Groups能够看作Exclusive Consumer的升级版,是一个能够并行的Exclusive Consumer。原理是经过使用JMSXGroupID来定义消息组。拥有相同的JMSXGroupID的消息将发送到同一个Queue,这个技术相似于Sesssion Sticky技术。这样既能够保证消费者的高可用(由于能够有多个消费者消费同一队列),又能够保证消息按顺序消费,还不会像Exclusive Consumer同样浪费资源(须要额外的处于等待状态的消费者待命)。若是消费者被关闭或者消息组被关闭,这个拥有JMSXGroupID的消息会自动被发送到其余消费者。spa
设置JMSXGroupID的例子以下:code
Mesasge message = session.createTextMessage("foo"); message.setStringProperty("JMSXGroupID", "your business key"); ... producer.send(message);
关闭消息组,经过设置JMSXGroupSeq的值为-1,例子以下:
Mesasge message = session.createTextMessage("foo"); message.setStringProperty("JMSXGroupID", "your business key"); message.setIntProperty("JMSXGroupSeq", -1); // ... producer.send(message);
可使用JMSXGroupFirstForConsumer来判断这个消费者是不是第一次消费这个JMSXGroupID的消息:
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) { // flush cache for groupId }
若是Broker中已经有消息了,这时因为启动消费者的速度不一致,可能会致使某些消费者先启动并率先消费消息,致使A消费者的负载不均匀。能够在Broker中配置timeBeforeDispatchStarts,让消费者延迟一段时间再开始负载消费。或者配置consumersBeforeDispatchStarts,让消费者达到必定数量再开始负载消费。
具体配置以下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/> </policyEntries> </policyMap> </destinationPolicy>
设置有两个消费者都启动好或者2秒以后,在开始负载消费消息。
Virtual Topics:
Message Groups只能用于Queue,那么Topic对应的版本,就是这个Virtual Topics。他实现了和Message Groups相似的功能,负载均衡和失效转移。
对于消息的生产者来讲,Virtual Topics只是一个普通的Topic,可是必须以VirtualTopic.(可配置)开头,如VirtualTopic.Foo。
而消息的消费者链接是一个队列。这个队列须要听从如下规则,即Consumer.ClientID.VirtualTopicName。例如:Consumer.CilentA.VirtualTopic.Foo,表示消费者的Client id是ClientA,消费的是VirtualTopic.Foo这个Topic。
消息生产者代码示例:
MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.FOO")); TextMessage message = session.createTextMessage("foo"); producer.send(message);
消息消费者代码示例:
MessageConsumer consumerA = session.createConsumer(new ActiveMQQueue("Consumer.ClientA.VirtualTopic.FOO")); consumerA.setMessageListener(new MessageListener() { public void onMessage(Message message) { // do something ... } });
能够看到,经过Virtual Topics注册的queue订阅关系以下:
这里使用了2个client,A和B,分别订阅了两个主题pojo_topic和string_topic。
而消息的存储格式以下:
经过上图能够看到,消息是以Queue的形式存储,并不是Topic,可是Queue被直接根据所订阅的ClientID生成多份消息。好比,VirtualTopic.string_topic有两个订阅者A和B,那消息就被分红Consumer.A.VirtualTopic.string_topic和Consumer.B.VirtualTopic.string_topic。这就是将Topic经过Virtual Topics转换成了Queue。
只要转换成了Queue,就能够结合使用刚才介绍的Exclusive Consumer或者Message Groups对这个VirtualTopic的队列进行高可用和负载均衡的配置,从而实现Topic的高可用和负载均衡。
能够配置Broker,改变虚拟主题的默认前缀,以下面的配置,则表示虚拟主题的前缀是VirtualTopicConsumers。
<broker xmlns="http://activemq.apache.org/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="true"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker>
从ActiveMQ 5.4版本开始,能够配置selectorAware属性控制只有符合订阅者规则的消息才被分发给相应的虚拟队列,用于防止分发不匹配的消息,提高效率。
说了优势,固然也要说缺点。优势是能够实现高可用和负载均衡;缺点是,若是订阅者不少,每一个订阅者都须要复制一份消息,这样会占用过多的磁盘空间,形成消息爆炸。
Composite Destinations:
能够将消息发送给多个Destination。能够混合发送Queue和Topic。例如:
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
我的感受这个功能比较鸡肋,属于客户端控制服务器端的消息高可用。若是是纯粹的消息可用性复制,能够直接使用基于LevelDb的消息复制机制或基于JDBC的主从同步机制。