大众点评Cat源码阅读(六)——MessageTree编码、解码字节流

1、MessageTree的数据结构

结构以下图: 输入图片说明sql

2、客户端将MessageTree编码成字节流

3、服务端将字节流反编码成MessageTree

2.1 功能介绍

服务端接收到ByteBuf以后,交给PlainTextMessage解码,解码分两步走:session

  1. 先解析MessageTree的header,包含domain,hostname、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。这部份内容示例效果以下:

输入图片说明

  1. 解码MessageTree中的Message,这部分是一个嵌套的Transaction。

2.2代码分析

先看MesageTree的header部分功能解析。先看一个工具类BufferHelper,read方法代码以下:数据结构

/**
		 * 从ctx的buffer中读取字符串  到 separator 位置位置
		 * 如 read("abcde",'c')="ab"
		 * @param ctx
		 * @param separator
		 * @return
		 */
		public String read(Context ctx, byte separator) {
			ByteBuf buf = ctx.getBuffer();
			char[] data = ctx.getData();
			int from = buf.readerIndex();
			int to = buf.writerIndex();
			int index = 0;
			boolean flag = false;

			for (int i = from; i < to; i++) {
				byte b = buf.readByte();

				if (b == separator) {
					break;
				}
				//将所有的data
				if (index >= data.length) {
					char[] data2 = new char[to - from];

					System.arraycopy(data, 0, data2, 0, index);
					data = data2;
				}

				char c = (char) (b & 0xFF);

				if (c > 127) {
					flag = true;
				}

				if (c == '\\' && i + 1 < to) {
					byte b2 = buf.readByte();

					if (b2 == 't') {
						c = '\t';
						i++;
					} else if (b2 == 'r') {
						c = '\r';
						i++;
					} else if (b2 == 'n') {
						c = '\n';
						i++;
					} else if (b2 == '\\') {
						c = '\\';
						i++;
					} else {
						// move back
						buf.readerIndex(i + 1);
					}
				}

				data[index] = c;
				index++;
			}

			if (!flag) {
				return new String(data, 0, index);
			} else {
				byte[] ba = new byte[index];

				for (int i = 0; i < index; i++) {
					ba[i] = (byte) (data[i] & 0xFF);
				}

				try {
					return new String(ba, 0, index, "utf-8");
				} catch (UnsupportedEncodingException e) {
					return new String(ba, 0, index);
				}
			}
		}

解码header部分就是调用上面的方法,逐步将context中的bytebuf里面的header部分解析出来,并将readindex移动,后面直接解码Message。dom

protected void decodeHeader(Context ctx, MessageTree tree) {
		BufferHelper helper = m_bufferHelper;
		String id = helper.read(ctx, TAB);
		String domain = helper.read(ctx, TAB);
		String hostName = helper.read(ctx, TAB);
		String ipAddress = helper.read(ctx, TAB);
		String threadGroupName = helper.read(ctx, TAB);
		String threadId = helper.read(ctx, TAB);
		String threadName = helper.read(ctx, TAB);
		String messageId = helper.read(ctx, TAB);
		String parentMessageId = helper.read(ctx, TAB);
		String rootMessageId = helper.read(ctx, TAB);
		String sessionToken = helper.read(ctx, LF);

		if (VERSION.equals(id)) {
			tree.setDomain(domain);
			tree.setHostName(hostName);
			tree.setIpAddress(ipAddress);
			tree.setThreadGroupName(threadGroupName);
			tree.setThreadId(threadId);
			tree.setThreadName(threadName);
			tree.setMessageId(messageId);
			tree.setParentMessageId(parentMessageId);
			tree.setRootMessageId(rootMessageId);
			tree.setSessionToken(sessionToken);
		} else {
			throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));
		}
	}

2.2 解码Message

仍是以上面的MessageTree为例,以下: 输入图片说明ide

解码步骤以下:工具

  1. 新建stack,解析上图第一行,将数据塞进新建的transaction1中,将null入stack,将transaction1返回,stack内的数据以下:编码

  2. 将上一部返回的transaction1做为parent和stack传进decodeLine方法,解析第2行,新建一个event,将event做为child挂到parent(transaction1)下,返回parent(transaction1)code

  3. 将上一步返回的parent(transaction1)作为parent,连带stack,一块儿传进decodeLine方法,解析第3行,将event做为child挂到parent(transaction1)下,返回parent(transaction1)orm

  4. 相似的解析第四、5行图片

  5. 将上一步返回的parent(transaction1)作为parent,连带stack,一块儿传进decodeLine方法,解析第6行,新建一个transaction2,将transaction2挂到parent(transaction1)下面的child中,将parent(transaction1)入栈,此时stack内有null,transaction1两个元素。将transaction2返回出去,做为下一步的parent,栈内数据以下:

  6. 相似上一步,将新建的transaction3挂到上一步产生的parent(transaction2)下,将parent(transaction2)入栈,此时stack内有null、transaction一、transaction2三个元素。接下来把transaction3返回出去,做为下一步的parent。栈内元素以下:

  7. 第8行相似第2行

  8. 第9行,将解析的数据注入到parent(transaction3)中,这时候transaction3处理完结了。从栈顶拿到一个最近的元素(transaction2),做为下一步处理的parent。栈内元素以下:

  9. 第10行,发现是t开头的,又新建一个transaction4,将parent(transaction2)入栈,将transaction4做为下一步的parent。栈内元素以下:

  10. 第11行,相似第2行处理,挂到parent(transaction4)下面,将parent(transaction4)做为下一步的parent继续处理。

  11. 第12行,将数据注入到parent(transaction4)中,完结这个parent(transaction4),将栈顶的transaction2做为下一步的parent,继续处理。

  12. 第13行,相似上一步,完结transaction2,将栈顶transaction1做为下一步的parent,继续处理。

  13. 第14行,相似上一步,完结transaction1,将栈顶的null做为下一步的parent,继续处理。此时栈内元素为空。 发现没有数据能够处理了,此时,整个MessageTree解码完成。处理每一行数据时,对应的栈内数据以下:

输入图片说明

解析MessageTree代码以下:

//将原消息解析成一棵消息树
	protected void decodeMessage(Context ctx, MessageTree tree) {
		Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
		Message parent = decodeLine(ctx, null, stack);

		tree.setMessage(parent);
		//逐行读取,最终构建消息树
		while (ctx.getBuffer().readableBytes() > 0) {
		    /***************  将bytebuf转换成defaultTransaction *********************/
			Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);

			if (message instanceof DefaultTransaction) {
				parent = message;
			} else {
				break;
			}
		}
	}

逐行解析的decodeLine 代码以下:

protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) {
		BufferHelper helper = m_bufferHelper;
		byte identifier = ctx.getBuffer().readByte();
		String timestamp = helper.read(ctx, TAB);
		String type = helper.read(ctx, TAB);
		String name = helper.read(ctx, TAB);

		switch (identifier) {
		case 't':
			/**
			 * 这种类型,才会新建一个transaction,
			 *    若是这是第一个,则将这个做为最顶级的parent,挂到MessageTree的message下,将parent(null)入栈
			 *    若是不是第一个,则将当前建立的transaction挂到parent(也是一个transaction)的下面,做为一个child,同时将parent入栈,等待当前
			 *    作完这些事,将当前的transaction做为下一次循环的parent,直到遇到'T',将当前这个transaction数据补充完整,再从栈清楚这个transaction,代表当前这个transaction完结
			 */
			DefaultTransaction transaction = new DefaultTransaction(type, name, null);

			helper.read(ctx, LF); // get rid of line feed
			transaction.setTimestamp(m_dateHelper.parse(timestamp));

			if (parent != null) {
				parent.addChild(transaction);
			}

			stack.push(parent);
			return transaction;
		case 'A':
			DefaultTransaction tran = new DefaultTransaction(type, name, null);
			String status = helper.read(ctx, TAB);
			String duration = helper.read(ctx, TAB);
			String data = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			tran.setTimestamp(m_dateHelper.parse(timestamp));
			tran.setStatus(status);
			tran.addData(data);

			long d = Long.parseLong(duration.substring(0, duration.length() - 2));
			tran.setDurationInMicros(d);

			if (parent != null) {
				parent.addChild(tran);
				return parent;//返回的是传进来的parent,这里parent是一个transaction,transaction把A类型的数据做为child收集了
			} else {
				return tran;
			}
		case 'T':
			String transactionStatus = helper.read(ctx, TAB);
			String transactionDuration = helper.read(ctx, TAB);
			String transactionData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			parent.setStatus(transactionStatus);
			parent.addData(transactionData);

			long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2));

			parent.setDurationInMicros(transactionD);

			return stack.pop();//这里不新建transaction,只是将数据插入到parent里面,将stock顶的transaction返回,
		case 'E':
			DefaultEvent event = new DefaultEvent(type, name);
			String eventStatus = helper.read(ctx, TAB);
			String eventData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			event.setTimestamp(m_dateHelper.parse(timestamp));
			event.setStatus(eventStatus);
			event.addData(eventData);

			//sql类型特殊处理
			processSQLEvent(event);
			
			if (parent != null) {
				parent.addChild(event);
				return parent;
			} else {
				return event;
			}
		case 'M':
			DefaultMetric metric = new DefaultMetric(type, name);
			String metricStatus = helper.read(ctx, TAB);
			String metricData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			metric.setTimestamp(m_dateHelper.parse(timestamp));
			metric.setStatus(metricStatus);
			metric.addData(metricData);

			if (parent != null) {
				parent.addChild(metric);
				return parent;
			} else {
				return metric;
			}
		case 'L':
			DefaultTrace trace = new DefaultTrace(type, name);
			String traceStatus = helper.read(ctx, TAB);
			String traceData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			trace.setTimestamp(m_dateHelper.parse(timestamp));
			trace.setStatus(traceStatus);
			trace.addData(traceData);

			if (parent != null) {
				parent.addChild(trace);
				return parent;
			} else {
				return trace;
			}
		case 'H':
			DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
			String heartbeatStatus = helper.read(ctx, TAB);
			String heartbeatData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			heartbeat.setTimestamp(m_dateHelper.parse(timestamp));
			heartbeat.setStatus(heartbeatStatus);
			heartbeat.addData(heartbeatData);

			if (parent != null) {
				parent.addChild(heartbeat);
				return parent;
			} else {
				return heartbeat;
			}
		default:
			m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: "
			      + ctx.getBuffer().toString(Charset.forName("utf-8")));
			throw new RuntimeException("Unknown identifier int name");
		}
	}
相关文章
相关标签/搜索