ActiveMQ学习笔记06 - 消费者负载均衡与高可用

ActiveMQ Broker提供基于LevelDB复制的方式提供高可用服务,可是对负载均衡作的很弱,只支持Static的服务器之间转发。目前比较流行的消息分片居然不支持。可是消费者的负载均衡和高可用仍是比较完善的。另外说一下,生产者的高可用和负载均衡,通常是靠外围程序控制。好比,基于Tomcat的web程序做为生产者,那么这个web程序的高可用,须要靠tomcat等外围程序。因此通常所说的高可用,主要指Broker和Consumer。java

下面介绍一下几个经常使用的消费者策略。web

Exclusive Consumer:apache

用于处理Queue的高可用。若是同时使用多个消费者从同一个Queue消费消息,那么消息的顺序性将得不到保证。这时候可使用Exclusive Consumer。使用Exclusive Consumer能够保证只有一个消费者在消费这个Queue,其余的消费者处于等待状态。一旦处理消费状态的消费者不可用,系统会自动使用失效转移机制,选择到一个新的消费者继续消费。
tomcat

使用以下:服务器

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

只须要在Destination上增长consumer.exclusive=true参数便可。网络

还能够给消费者设置优先级,用于针对网络和服务器资源不一样的状况。session

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

Message Groups:负载均衡

ActiveMQ 5.3版本新增长了这个功能。Message Groups能够看作Exclusive Consumer的升级版,是一个能够并行的Exclusive Consumer。原理是经过使用JMSXGroupID来定义消息组。拥有相同的JMSXGroupID的消息将发送到同一个Queue,这个技术相似于Sesssion Sticky技术。这样既能够保证消费者的高可用(由于能够有多个消费者消费同一队列),又能够保证消息按顺序消费,还不会像Exclusive Consumer同样浪费资源(须要额外的处于等待状态的消费者待命)。若是消费者被关闭或者消息组被关闭,这个拥有JMSXGroupID的消息会自动被发送到其余消费者。spa

设置JMSXGroupID的例子以下:code

Mesasge message = session.createTextMessage("foo");   
message.setStringProperty("JMSXGroupID", "your business key");   
...
producer.send(message);

关闭消息组,经过设置JMSXGroupSeq的值为-1,例子以下:

Mesasge message = session.createTextMessage("foo");
message.setStringProperty("JMSXGroupID", "your business key");
message.setIntProperty("JMSXGroupSeq", -1);
// ...
producer.send(message);

可使用JMSXGroupFirstForConsumer来判断这个消费者是不是第一次消费这个JMSXGroupID的消息:

if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
   // flush cache for groupId
}

若是Broker中已经有消息了,这时因为启动消费者的速度不一致,可能会致使某些消费者先启动并率先消费消息,致使A消费者的负载不均匀。能够在Broker中配置timeBeforeDispatchStarts,让消费者延迟一段时间再开始负载消费。或者配置consumersBeforeDispatchStarts,让消费者达到必定数量再开始负载消费。

具体配置以下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

设置有两个消费者都启动好或者2秒以后,在开始负载消费消息。

Virtual Topics:

Message Groups只能用于Queue,那么Topic对应的版本,就是这个Virtual Topics。他实现了和Message Groups相似的功能,负载均衡和失效转移。

对于消息的生产者来讲,Virtual Topics只是一个普通的Topic,可是必须以VirtualTopic.(可配置)开头,如VirtualTopic.Foo。

而消息的消费者链接是一个队列。这个队列须要听从如下规则,即Consumer.ClientID.VirtualTopicName。例如:Consumer.CilentA.VirtualTopic.Foo,表示消费者的Client id是ClientA,消费的是VirtualTopic.Foo这个Topic。

消息生产者代码示例:

MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.FOO"));
TextMessage message = session.createTextMessage("foo");
producer.send(message);

消息消费者代码示例:

MessageConsumer consumerA = session.createConsumer(new ActiveMQQueue("Consumer.ClientA.VirtualTopic.FOO"));  
consumerA.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        // do something ...
    }
});

能够看到,经过Virtual Topics注册的queue订阅关系以下:

这里使用了2个client,A和B,分别订阅了两个主题pojo_topic和string_topic。

而消息的存储格式以下:

经过上图能够看到,消息是以Queue的形式存储,并不是Topic,可是Queue被直接根据所订阅的ClientID生成多份消息。好比,VirtualTopic.string_topic有两个订阅者A和B,那消息就被分红Consumer.A.VirtualTopic.string_topic和Consumer.B.VirtualTopic.string_topic。这就是将Topic经过Virtual Topics转换成了Queue。

只要转换成了Queue,就能够结合使用刚才介绍的Exclusive Consumer或者Message Groups对这个VirtualTopic的队列进行高可用和负载均衡的配置,从而实现Topic的高可用和负载均衡。

能够配置Broker,改变虚拟主题的默认前缀,以下面的配置,则表示虚拟主题的前缀是VirtualTopicConsumers。

<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="true"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>

从ActiveMQ 5.4版本开始,能够配置selectorAware属性控制只有符合订阅者规则的消息才被分发给相应的虚拟队列,用于防止分发不匹配的消息,提高效率。

说了优势,固然也要说缺点。优势是能够实现高可用和负载均衡;缺点是,若是订阅者不少,每一个订阅者都须要复制一份消息,这样会占用过多的磁盘空间,形成消息爆炸。

Composite Destinations:

能够将消息发送给多个Destination。能够混合发送Queue和Topic。例如:

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

我的感受这个功能比较鸡肋,属于客户端控制服务器端的消息高可用。若是是纯粹的消息可用性复制,能够直接使用基于LevelDb的消息复制机制或基于JDBC的主从同步机制。

相关文章
相关标签/搜索