这节介绍Consumer接收消息的流程,分为Pull和Push模式。segmentfault
上一节讲Rebalance时提到,Consumer接受客户端有两种方式:缓存
其中1.的通知到达Consumer后,会当即触发Rebalance,而后会重置2.的定时器等待时间。两者最后通知Consumer的方式为微信
executePullRequestImmediately的内容为:app
public void executePullRequestImmediately(final PullRequest pullRequest) { this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }
即将PullRequest对象传给了PullMessageService的executePullRequestImmediately方法:框架
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
PullMessageService的结构以下:异步
内部维护着一个LinkedBlockingQueue属性pullRequestQueue,用于存储待处理的PullRequest;还有一个ScheduledExecutorService,用于延期处理PullRequest。具体流程以下:ide
从上面的过程能够看出,Push模式内部仍是客户端主动去拉取的,即所谓的封装拉模式以实现推模式,简单示意图以下:fetch
内部经过PullMessageService循环的从PullRequest对应MessageQueue中主动拉取数据。this
该方法用于完成从MessageQueue拉取消息的过程,主要过程以下:spa
进行一系列的检查,若是检查不经过,则等待必定时间后再放回PullMessageService的待处理队列中,主要是经过PullMessageService中的ScheduledExecutorService来作到延迟执行,涉及的状况包括:
上面经过PullAPIWrapper收到结果后会将结果包装为PullResult对象并回调PullCallback。PullCallback和PullResult的定义以下:
public interface PullCallback { void onSuccess(final PullResult pullResult); void onException(final Throwable e); }
public class PullResult { private final PullStatus pullStatus;//请求状态 private final long nextBeginOffset;//Broker返回的下一次开始消费的offset private final long minOffset; private final long maxOffset; private List<MessageExt> msgFoundList;//消息列表,一次请求返回一批消息 }
下面为pullMessage方法处理异步返回结果的流程:
根据请求状态进行处理
有新消息(FOUND)
没有新消息(NO_NEW_MSG)
没有匹配的消息(NO_MATCHED_MSG)
不合法的偏移量(OFFSET_ILLEGAL)
下面先介绍下ProcessQueue,这里只标识几个相关的属性:
public class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //缓存的待消费消息,按照消息的起始offset排序 private final TreeMap</*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //缓存的待消费消息数量 private final AtomicLong msgCount = new AtomicLong(); //缓存的待消费消息大小 private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); /** * A subset of msgTreeMap, will only be used when orderly consume */ private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; //最近执行pull的时间 private volatile long lastPullTimestamp = System.currentTimeMillis(); //最近被客户端消费的时间 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false; private volatile long lastLockTimestamp = System.currentTimeMillis(); //当前是否在消费,用于顺序消费模式,对并行消费无效 private volatile boolean consuming = false; private volatile long msgAccCnt = 0; }
ProcessQueue展现了MessageQueue的消费状况。上面提到,发起pull请求后若是有数据,会先放到ProcessQueue的缓存中,即msgTreeMap属性,于是缓存的消息会按照消息的起始offset被排序存储。经过ProcessQueue能够查看MessageQueue当前的处理状况,ProcessQueue还用于辅助实现顺序消费。
异步返回的消息内容将交给ConsumeMessageService处理,ConsumeMessageService是个接口,方法定义以下:
public interface ConsumeMessageService { void start(); void shutdown(); void updateCorePoolSize(int corePoolSize); void incCorePoolSize(); void decCorePoolSize(); int getCorePoolSize(); ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume); }
经过定义可见,要求实现类提供异步处理的功能。内部提供的实现类有:
ConsumeMessageConcurrentlyService:并行消费;ConsumeMessageOrderlyService:顺序消费,这里重点看ConsumeMessageConcurrentlyService。异步请求后会将拉取的新消息列表交给submitConsumeRequest方法处理,以下:
该方法会将传入的消息列表分装为一个ConsumeRequest,并提到到线程池中等待处理。若是传入的消息列表长度超过设定值(默认为1),则会分多个批处理。
在介绍消费具体过程以前先回顾客户端启动流程的Demo,接收消息的写法以下:
public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest"); // 设置NameServer的地址 consumer.setNamesrvAddr ("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤须要消费的消息 consumer.subscribe ("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start (); System.out.printf ("Consumer Started.%n"); } }
其中注册了一个MessageListenerConcurrently,该类将用于用户端处理消息。
回过来看ConsumeRequest,该类实现了Runnable接口,会在run方法完成主要的处理工做,主要动做为:
用户真正接收消息并执行处理动做的地方,须要返回ConsumeConcurrentlyStatus告知框架处理结果。这里在方法里最好不要作耗时长的任务,快速处理后返回给框架结果,避免消息堆积在线程池中。能够将消息内容复制一遍后再放到线程池中进行分发处理。
该方法主要在用户消费完数据后进行收尾动做,过程以下:
ConsumerRequest在run方法的开始处,实例化了一个ConsumeConcurrentlyContext对象,用于后续传递内容,该定义为:
public class ConsumeConcurrentlyContext { private final MessageQueue messageQueue; //重试的延迟级别,-1:不重试;0:由broker控制;>0由客户端控制 private int delayLevelWhenNextConsume = 0; //消息列表最后一个正常消费的消息索引号 private int ackIndex = Integer.MAX_VALUE; }
其中ackIndex表示最后一个正常消费的消息索引号(0从开始,0~ackIndex为正常消费),该位置后的消息表示无法正常消费。该值由用户端控制,能够经过ackIndex来控制须要重发的消息。
ackIndex默认值为Integer.MAX_VALUE,若是为该值则认为全部消息正常消费,不存在错误。上面流程中统计成功和失败也是根据ackIndex来判断的,对于ackIndex后的消息,若是是集群消费模式,则会先尝试发送回broker,由broker控制重试时机;若是重试失败,会收集这些失败的消息,延迟5秒后再调用一次ConsumeMessageService.submitConsumeRequest让用户端再次处理。最后会将处理成功的消息从ProcessQueue中移除,更新缓存,而后将q消费的偏移量记录下来,等待后台线程同步到broker或者本地。
综合上面的介绍,Push模式下的处理流程大体以下:
Push模式经过PullMessageService循环从监听的MessageQueue中以Pull模式拉取消息,并分发给用户注册的MesageListenerConsurrently对象处理,完了以后会自动处理消息的重试,offset更新等动做,从而模拟消息从Broker端主动推进过来。
同Push模式同样,Pull模式的触发也是经过Rebalance,以下:
同开头说起的同样,会回调DefaultMQPullConsumerImpl的MessageQueueListener有Queue发生改变。
系统提供了MQPullConsumerScheduleService,能够定时以Pull模式拉取消息,并将结果通知MessageQueueListener,内部的实现为:
class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {//mqAll该topic下的全部q,mqDivided该实例分配到的q MessageModel messageModel = MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知该topic下的监听器,最新的全部q break; case CLUSTERING: MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知该topic下的监听器,该实例分配的q break; default: break; } } }
putTask会将分配到的新的MessageQueue包装为一个PullTaskImpl,PullTaskImpl实现了Runnable,会在后台一直执行;而将不属于本身处理的MessageQueue对应的PullTaskImpl停掉。PullTaskImpl会查找该MessageQueue所监听topic对应的处理类PullTaskCallback,调用doPullTask,将具体动做让用户处理。
MQPullConsumerScheduleService的例子为:
public class PullScheduleService { public static void main(String[] args) throws MQClientException { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//注册topic的监听器 @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) offset = 0; PullResult pullResult = consumer.pull(mq, "*", offset, 32); System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上报消费的offset,消费完后要主动上报 context.setPullNextDelayTimeMillis(100);//设置下一次触发间隔 } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } }
也能够本身手动执行pull,以下面的例子:
public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
相较于Push模式,Pull模式则须要用户本身控制消息的重试,offset更新等动做。下面附上该部分当时源码阅读过程作的笔记简图:
更多原创内容请搜索微信公众号:啊驼(doubaotaizi)