本文主要研究一下rocketmq的PullConsumerImpljava
io/openmessaging/rocketmq/consumer/PullConsumerImpl.javagit
public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; private final KeyValue properties; private boolean started = false; private String targetQueueName; private final MQPullConsumerScheduleService pullConsumerScheduleService; private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; final static Logger log = ClientLogger.getLog(); public PullConsumerImpl(final String queueName, final KeyValue properties) { this.properties = properties; this.targetQueueName = queueName; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup); this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); properties.put(PropertyKeys.CONSUMER_ID, consumerId); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } @Override public KeyValue properties() { return properties; } @Override public Message poll() { MessageExt rmqMsg = localMessageCache.poll(); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override public Message poll(final KeyValue properties) { MessageExt rmqMsg = localMessageCache.poll(properties); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override public void ack(final String messageId) { localMessageCache.ack(messageId); } @Override public void ack(final String messageId, final KeyValue properties) { localMessageCache.ack(messageId); } @Override public synchronized void startup() { if (!started) { try { registerPullTaskCallback(); this.pullConsumerScheduleService.start(); this.localMessageCache.startup(); } catch (MQClientException e) { throw new OMSRuntimeException("-1", e); } } this.started = true; } private void registerPullTaskCallback() { this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { @Override public void doPullTask(final MessageQueue mq, final PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = localMessageCache.nextPullOffset(mq); PullResult pullResult = consumer.pull(mq, "*", offset, localMessageCache.nextPullBatchNums()); ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() .getProcessQueueTable().get(mq); switch (pullResult.getPullStatus()) { case FOUND: if (pq != null) { pq.putMessage(pullResult.getMsgFoundList()); for (final MessageExt messageExt : pullResult.getMsgFoundList()) { localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); } } break; default: break; } localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset()); } catch (Exception e) { log.error("A error occurred in pull message process.", e); } } }); } @Override public synchronized void shutdown() { if (this.started) { this.localMessageCache.shutdown(); this.pullConsumerScheduleService.shutdown(); this.rocketmqPullConsumer.shutdown(); } this.started = false; } }
org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.javagithub
/** * Schedule service for pull consumer */ public class MQPullConsumerScheduleService { private final Logger log = ClientLogger.getLog(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); private DefaultMQPullConsumer defaultMQPullConsumer; private int pullThreadNums = 20; private ConcurrentMap<String /* topic */, PullTaskCallback> callbackTable = new ConcurrentHashMap<String, PullTaskCallback>(); private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; public MQPullConsumerScheduleService(final String consumerGroup) { this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); } public void start() throws MQClientException { final String group = this.defaultMQPullConsumer.getConsumerGroup(); this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( this.pullThreadNums, new ThreadFactoryImpl("PullMsgThread-" + group) ); this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener); this.defaultMQPullConsumer.start(); log.info("MQPullConsumerScheduleService start OK, {} {}", this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable); } //...... }
org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.javaapache
class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: MQPullConsumerScheduleService.this.putTask(topic, mqAll); break; case CLUSTERING: MQPullConsumerScheduleService.this.putTask(topic, mqDivided); break; default: break; } } }
org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.javaide
public void putTask(String topic, Set<MessageQueue> mqNewSet) { Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, PullTaskImpl> next = it.next(); if (next.getKey().getTopic().equals(topic)) { if (!mqNewSet.contains(next.getKey())) { next.getValue().setCancelled(true); it.remove(); } } } for (MessageQueue mq : mqNewSet) { if (!this.taskTable.containsKey(mq)) { PullTaskImpl command = new PullTaskImpl(mq); this.taskTable.put(mq, command); this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS); } } }
org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.javaui
class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; public PullTaskImpl(final MessageQueue messageQueue) { this.messageQueue = messageQueue; } @Override public void run() { String topic = this.messageQueue.getTopic(); if (!this.isCancelled()) { PullTaskCallback pullTaskCallback = MQPullConsumerScheduleService.this.callbackTable.get(topic); if (pullTaskCallback != null) { final PullTaskContext context = new PullTaskContext(); context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer); try { pullTaskCallback.doPullTask(this.messageQueue, context); } catch (Throwable e) { context.setPullNextDelayTimeMillis(1000); log.error("doPullTask Exception", e); } if (!this.isCancelled()) { MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this, context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); } else { log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); } } else { log.warn("Pull Task Callback not exist , {}", topic); } } else { log.warn("The Pull Task is cancelled, {}", messageQueue); } } public boolean isCancelled() { return cancelled; } public void setCancelled(boolean cancelled) { this.cancelled = cancelled; } public MessageQueue getMessageQueue() { return messageQueue; } }
io/openmessaging/rocketmq/consumer/PullConsumerImpl.javathis
private void registerPullTaskCallback() { this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { @Override public void doPullTask(final MessageQueue mq, final PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = localMessageCache.nextPullOffset(mq); PullResult pullResult = consumer.pull(mq, "*", offset, localMessageCache.nextPullBatchNums()); ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() .getProcessQueueTable().get(mq); switch (pullResult.getPullStatus()) { case FOUND: if (pq != null) { pq.putMessage(pullResult.getMsgFoundList()); for (final MessageExt messageExt : pullResult.getMsgFoundList()) { localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); } } break; default: break; } localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset()); } catch (Exception e) { log.error("A error occurred in pull message process.", e); } } }); }