聊聊artemis的ConnectionLoadBalancingPolicy

本文主要研究一下artemis的ConnectionLoadBalancingPolicyjava

ServerLocatorImpl.selectConnector

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.javagit

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {

   //......

   private TransportConfiguration selectConnector() {
      Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;

      flushTopology();

      synchronized (topologyArrayGuard) {
         usedTopology = topologyArray;
      }

      synchronized (this) {
         if (usedTopology != null && useTopologyForLoadBalancing) {
            if (logger.isTraceEnabled()) {
               logger.trace("Selecting connector from topology.");
            }
            int pos = loadBalancingPolicy.select(usedTopology.length);
            Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];

            return pair.getA();
         } else {
            if (logger.isTraceEnabled()) {
               logger.trace("Selecting connector from initial connectors.");
            }

            int pos = loadBalancingPolicy.select(initialConnectors.length);

            return initialConnectors[pos];
         }
      }
   }

   //......
}
  • ServerLocatorImpl的selectConnector方法会对于useTopologyForLoadBalancing的会经过loadBalancingPolicy.select(usedTopology.length)来获取pos,以后返回usedTopology[pos].getA();不然经过loadBalancingPolicy.select(initialConnectors.length)获取pos,以后返回initialConnectors[pos]

ConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/ConnectionLoadBalancingPolicy.javagithub

public interface ConnectionLoadBalancingPolicy {

   /**
    * Returns the selected index according to the policy implementation.
    *
    * @param max maximum position index that can be selected
    */
   int select(int max);
}
  • ConnectionLoadBalancingPolicy定义了select接口,返回选中的index;它有四个实现类分别是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

FirstElementConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/FirstElementConnectionLoadBalancingPolicy.javaapache

public final class FirstElementConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   /**
    * @param max param is ignored
    * @return 0
    */
   @Override
   public int select(final int max) {
      return 0;
   }
}
  • FirstElementConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法始终返回0

RandomConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomConnectionLoadBalancingPolicy.javaapi

public final class RandomConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   /**
    * Returns a pseudo random number between {@code 0} (inclusive) and {@code max} exclusive.
    *
    * @param max the upper limit of the random number selection
    * @see java.util.Random#nextInt(int)
    */
   @Override
   public int select(final int max) {
      return RandomUtil.randomInterval(0, max);
   }
}
  • RandomConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法使用RandomUtil.randomInterval(0, max)随机返回一个index

RoundRobinConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RoundRobinConnectionLoadBalancingPolicy.javadom

public final class RoundRobinConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy, Serializable {

   private static final long serialVersionUID = 7511196010141439559L;

   private boolean first = true;

   private int pos;

   @Override
   public int select(final int max) {
      if (first) {
         // We start on a random one
         pos = RandomUtil.randomInterval(0, max);

         first = false;
      } else {
         pos++;

         if (pos >= max) {
            pos = 0;
         }
      }

      return pos;
   }
}
  • RoundRobinConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select在第一次执行的时候随机选择一个pos,以后对pos递增,对于递增以后大于等于max的重置pos为0

RandomStickyConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.javaide

public final class RandomStickyConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   private int pos = -1;

   /**
    * @see java.util.Random#nextInt(int)
    */
   @Override
   public int select(final int max) {
      if (pos == -1) {
         pos = RandomUtil.randomInterval(0, max);
      }

      return pos;
   }
}
  • RandomStickyConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法第一次随机选择一个pos,以后都返回该pos

小结

ConnectionLoadBalancingPolicy定义了select接口,返回选中的index;它有四个实现类分别是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicythis

doc

相关文章
相关标签/搜索