本文主要研究一下rocketmq的AllocateMessageQueueConsistentHashjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.javanode
public interface AllocateMessageQueueStrategy { /** * Allocating by consumer id * * @param consumerGroup current consumer group * @param currentCID current consumer id * @param mqAll message queue set in current topic * @param cidAll consumer set in current consumer group * @return The allocate result of given strategy */ List<MessageQueue> allocate( final String consumerGroup, final String currentCID, final List<MessageQueue> mqAll, final List<String> cidAll ); /** * Algorithm name * * @return The strategy name */ String getName(); }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.javagit
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); private final int virtualNodeCnt; private final HashFunction customHashFunction; public AllocateMessageQueueConsistentHash() { this(10); } public AllocateMessageQueueConsistentHash(int virtualNodeCnt) { this(virtualNodeCnt, null); } public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) { if (virtualNodeCnt < 0) { throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt); } this.virtualNodeCnt = virtualNodeCnt; this.customHashFunction = customHashFunction; } @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } final ConsistentHashRouter<ClientNode> router; //for building hash ring if (customHashFunction != null) { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt); } List<MessageQueue> results = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { results.add(mq); } } return results; } @Override public String getName() { return "CONSISTENT_HASH"; } private static class ClientNode implements Node { private final String clientID; public ClientNode(String clientID) { this.clientID = clientID; } @Override public String getKey() { return clientID; } } }
默认为10
)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,而后建立ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASHrocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.javagithub
public class ConsistentHashRouter<T extends Node> { private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>(); private final HashFunction hashFunction; public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) { this(pNodes, vNodeCount, new MD5Hash()); } /** * @param pNodes collections of physical nodes * @param vNodeCount amounts of virtual nodes * @param hashFunction hash Function to hash Node instances */ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) { if (hashFunction == null) { throw new NullPointerException("Hash Function is null"); } this.hashFunction = hashFunction; if (pNodes != null) { for (T pNode : pNodes) { addNode(pNode, vNodeCount); } } } /** * add physic node to the hash ring with some virtual nodes * * @param pNode physical node needs added to hash ring * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0 */ public void addNode(T pNode, int vNodeCount) { if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount); int existingReplicas = getExistingReplicas(pNode); for (int i = 0; i < vNodeCount; i++) { VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas); ring.put(hashFunction.hash(vNode.getKey()), vNode); } } /** * remove the physical node from the hash ring */ public void removeNode(T pNode) { Iterator<Long> it = ring.keySet().iterator(); while (it.hasNext()) { Long key = it.next(); VirtualNode<T> virtualNode = ring.get(key); if (virtualNode.isVirtualNodeOf(pNode)) { it.remove(); } } } /** * with a specified key, route the nearest Node instance in the current hash ring * * @param objectKey the object key to find a nearest Node */ public T routeNode(String objectKey) { if (ring.isEmpty()) { return null; } Long hashVal = hashFunction.hash(objectKey); SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal); Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); return ring.get(nodeHashVal).getPhysicalNode(); } public int getExistingReplicas(T pNode) { int replicas = 0; for (VirtualNode<T> vNode : ring.values()) { if (vNode.isVirtualNodeOf(pNode)) { replicas++; } } return replicas; } //default hash function private static class MD5Hash implements HashFunction { MessageDigest instance; public MD5Hash() { try { instance = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { } } @Override public long hash(String key) { instance.reset(); instance.update(key.getBytes()); byte[] digest = instance.digest(); long h = 0; for (int i = 0; i < 4; i++) { h <<= 8; h |= ((int) digest[i]) & 0xFF; } return h; } } }
AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(默认为10
)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,而后建立ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASHapache