本文沿着《RocketMQ消息轨迹-设计篇》的思路,从以下3个方面对其源码进行解读:java
首先咱们来看一下在消息发送端如何启用消息轨迹,示例代码以下:json
public class TraceProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // [@1](https://my.oschina.net/u/1198) producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
从上述代码能够看出其关键点是在建立DefaultMQProducer时指定开启消息轨迹跟踪。咱们不妨浏览一下DefaultMQProducer与启用消息轨迹相关的构造函数:服务器
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
参数以下:app
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // [@1](https://my.oschina.net/u/1198) this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { // @2 try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); // [@3](https://my.oschina.net/u/2648711) } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }
代码@1:首先介绍一下其局部变量。运维
代码@2:用来构建AsyncTraceDispatcher,看其名:异步转发消息轨迹数据,稍后重点关注。异步
代码@3:构建SendMessageTraceHookImpl对象,并使用AsyncTraceDispatcher用来异步转发。函数
public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { // @1 return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList<tracebean>(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); // @2 //build the data bean object of message trace TraceBean traceBean = new TraceBean(); // @3 traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); }
代码@1:若是topic主题为消息轨迹的Topic,直接返回。源码分析
代码@2:在消息发送上下文中,设置用来跟踪消息轨迹的上下环境,里面主要包含一个TraceBean集合、追踪类型(TraceType.Pub)与生产者所属的组。学习
代码@3:构建一条跟踪消息,用TraceBean来表示,记录原消息的topic、tags、keys、发送到broker地址、消息体长度等消息。ui
从上文看出,sendMessageBefore主要的用途就是在消息发送的时候,先准备一部分消息跟踪日志,存储在发送上下文环境中,此时并不会发送消息轨迹数据。
public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) // @1 || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); // @2 int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); // @3 tuxeContext.setCostTime(costTime); // @4 if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); // @5 }
代码@1:若是topic主题为消息轨迹的Topic,直接返回。
代码@2:从MqTraceContext中获取跟踪的TraceBean,虽然设计成List结构体,但在消息发送场景,这里的数据永远只有一条,及时是批量发送也不例外。
代码@3:获取消息发送到收到响应结果的耗时。
代码@4:设置costTime(耗时)、success(是否发送成功)、regionId(发送到broker所在的分区)、msgId(消息ID,全局惟一)、offsetMsgId(消息物理偏移量,若是是批量消息,则是最后一条消息的物理偏移量)、storeTime,这里使用的是(客户端发送时间 + 二分之一的耗时)来表示消息的存储时间,这里是一个估值。
代码@5:将须要跟踪的信息经过TraceDispatcher转发到Broker服务器。其代码以下:
public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } return result; }
这里一个很是关键的点是offer方法的使用,当队列没法容纳新的元素时会当即返回false,并不会阻塞。
接下来将目光转向TraceDispatcher的实现。
TraceDispatcher,用于客户端消息轨迹数据转发到Broker,其默认实现类:AsyncTraceDispatcher。
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue<tracecontext>(1024); this.appenderQueue = new ArrayBlockingQueue<runnable>(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } // @1 this.traceExecuter = new ThreadPoolExecutor(// : 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); // @2 }
代码@1:初始化核心属性,该版本这些值都是“固化”的,用户没法修改。
代码@2:调用getAndCreateTraceProducer方法建立用于发送消息轨迹的Producer(消息发送者),下面详细介绍一下其实现。
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { //@1 traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); } return traceProducerInstance; }
代码@1:若是还未创建发送者,则建立用于发送消息轨迹的消息发送者,其GroupName为:_INNER_TRACE_PRODUCER,消息发送超时时间5s,最大容许发送消息大小118K。
public void start(String nameSrvAddr) throws MQClientException { if (isStarted.compareAndSet(false, true)) { // @1 traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); // @2 this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); }
开始启动,其调用的时机为启动DefaultMQProducer时,若是启用跟踪消息轨迹,则调用之。
代码@1:若是用于发送消息轨迹的发送者没有启动,则设置nameserver地址,并启动着。
代码@2:启动一个线程,用于执行AsyncRunnable任务,接下来将重点介绍。
class AsyncRunnable implements Runnable { private boolean stopped; public void run() { while (!stopped) { List<tracecontext> contexts = new ArrayList<tracecontext>(batchSize); // @1 for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2 } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() > 0) { : AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); // @3 traceExecuter.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } }
代码@1:构建待提交消息跟踪Bean,每次最多发送batchSize,默认为100条。
代码@2:从traceContextQueue中取出一个待提交的TraceContext,设置超时时间为5s,即如何该队列中没有待提交的TraceContext,则最多等待5s。
代码@3:向线程池中提交任务AsyncAppenderRequest。
public void sendTraceData(List<tracecontext> contextList) { Map<string, list<tracetransferbean>> transBeanMap = new HashMap<string, list<tracetransferbean>>(); for (TraceContext context : contextList) { //@1 if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); // @2 // Use original message entity's topic as key String key = topic; List<tracetransferbean> transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList<tracetransferbean>(); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); // @3 transBeanList.add(traceData); } for (Map.Entry<string, list<tracetransferbean>> entry : transBeanMap.entrySet()) { // @4 flushData(entry.getValue()); } }
代码@1:遍历收集的消息轨迹数据。
代码@2:获取存储消息轨迹的Topic。
代码@3:对TraceContext进行编码,这里是消息轨迹的传输数据,稍后对其详细看一下,了解其上传的格式。
代码@4:将编码后的数据发送到Broker服务器。
根据消息轨迹跟踪类型,其格式会有一些不同,下面分别来介绍其合适。
case Pub: { TraceBean bean = ctx.getTraceBeans().get(0); //append the content of context and traceBean to transferBean's TransData sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); }
消息轨迹数据的协议使用字符串拼接,字段的分隔符号为1,整个数据以2结尾,感受这个设计仍是有点“难以想象”,为何不直接使用json协议呢?
for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// } }
轨迹就是按照上述顺序拼接而成,各个字段使用1分隔,每一条记录使用2结尾。
case SubAfter: { for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); } } }
格式编码同样,就不重复多说。
通过上面的源码跟踪,消息发送端的消息轨迹跟踪流程、消息轨迹数据编码协议就清晰了,接下来咱们使用一张序列图来结束本部分的讲解。
其实行文至此,只关注了消息发送的消息轨迹跟踪,消息消费的轨迹跟踪又是如何呢?其实现原理实际上是同样的,就是在消息消费先后执行特定的钩子函数,其实现类为ConsumeMessageTraceHookImpl,因为其实现与消息发送的思路相似,故就不详细介绍了。
其实从上面的分析,咱们已经得知,RocketMQ的消息轨迹数据存储在到Broker上,那消息轨迹的主题名如何指定?其路由信息又怎么分配才好呢?是每台Broker上都建立仍是只在其中某台上建立呢?RocketMQ支持系统默认与自定义消息轨迹的主题。
RocketMQ默认的消息轨迹主题为:RMQ_SYS_TRACE_TOPIC,那该Topic须要手工建立吗?其路由信息呢?
{ if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { // @1 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); // @2 topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
上述代码出自TopicConfigManager的构造函数,在Broker启动的时候会建立topicConfigManager对象,用来管理topic的路由信息。
代码@1:若是Broker开启了消息轨迹跟踪(traceTopicEnable=true)时,会自动建立默认消息轨迹的topic路由信息,注意其读写队列数为1。
在建立消息发送者、消息消费者时,能够显示的指定消息轨迹的Topic,例如:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
经过customizedTraceTopic来指定消息轨迹Topic。
舒适提示:一般在生产环境上,将不会开启自动建立主题,故须要RocketMQ运维管理人员提早建立好Topic。
好了,本文就介绍到这里了,本文详细介绍了RocktMQ消息轨迹的实现原理,下一篇,咱们将进入到多副本的学习中。
做者简介:《RocketMQ技术内幕》做者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描以下二维码与做者进行互动。