本文主要研究一下rocketmq的consumeTimeoutjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Maximum amount of time in minutes a message may block the consuming thread. */ private long consumeTimeout = 15; public long getConsumeTimeout() { return consumeTimeout; } public void setConsumeTimeout(final long consumeTimeout) { this.consumeTimeout = consumeTimeout; } public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javagithub
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.javaapache
public class ProcessQueue { //...... public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.lockTreeMap.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } finally { this.lockTreeMap.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } try { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.lockTreeMap.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { removeMessage(Collections.singletonList(msg)); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } } } public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; } //...... }
ConsumeMessageConcurrentlyService的start方法注册了一个定时任务,每隔defaultMQPushConsumer.getConsumeTimeout()执行一次cleanExpireMsg方法;cleanExpireMsg方法会遍历processQueueTable,而后挨个执行ProcessQueue的cleanExpiredMsg方法ide