首先看怎么构建一个Reporter:api
@Bean public Reporter<Span> reporter(){ Reporter<Span> reporter = AsyncReporter.builder( URLConnectionSender.create("http://localhost:9411/api/v1/spans")) .build(); return reporter; }
上面代码建立了一个http类型reporter,返回一个BoundedAsyncReporter类型的reporter并发
下面是URLConnectionSender.create方法,主要设置编码类型、链接读取超时设置、是否压缩以及最大字节数设置。jvm
public abstract class URLConnectionSender implements Sender { /** Creates a sender that posts {@link Encoding#THRIFT} messages. */ public static URLConnectionSender create(String endpoint) { return builder().endpoint(endpoint).build(); } public static Builder builder() { return new AutoValue_URLConnectionSender.Builder() .encoding(Encoding.THRIFT) .connectTimeout(10 * 1000) .readTimeout(60 * 1000) .compressionEnabled(true) .messageMaxBytes(5 * 1024 * 1024); }
接下来分析下AsyncReporter。ide
首先看下AsyncReporter的建立者build的build方法:oop
public AsyncReporter<Span> build() { switch (sender.encoding()) { case JSON: return build(Encoder.JSON); case THRIFT: return build(Encoder.THRIFT); default: throw new UnsupportedOperationException(sender.encoding().name()); } } //接下来会执行build(Encoder.THRIFT)方法: public <S> AsyncReporter<S> build(Encoder<S> encoder) { checkNotNull(encoder, "encoder"); //检查encode是否同样,不同ao抛出异常 checkArgument(encoder.encoding() == sender.encoding(), "Encoder.encoding() %s != Sender.encoding() %s", encoder.encoding(), sender.encoding()); //建立有限制的AsyncReporter,设置了上传消息的最大字节数、消息的超时时间 //以及初始化ByteBoundedQueue并设置最大长度以及最大字节数(默认jvm总内存的1%) final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder); if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop. final BufferNextMessage consumer = new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos); //启动一个线程处理队列中须要上报的消息 final Thread flushThread = new Thread(() -> { try { while (!result.closed.get()) { //上报消息 result.flush(consumer); } } finally { for (byte[] next : consumer.drain()) result.pending.offer(next); result.close.countDown(); } }, "AsyncReporter(" + sender + ")"); flushThread.setDaemon(true); flushThread.start(); } return result; } }
下面分析result.flush(consumer)方法:post
void flush(BufferNextMessage bundler) { if (closed.get()) throw new IllegalStateException("closed"); //该方法阻塞直到有数据转移到consumer中,即bundler //bundler.remainingNanos()获取若是cout为0,即队列没有数据的时候须要阻塞多久直到有数据 //下面主要看下ByteBoundedQueue的doDrain(Consumer consumer)方法 -》 pending.drainTo(bundler, bundler.remainingNanos()); // record after flushing reduces the amount of gauge events vs on doing this on report metrics.updateQueuedSpans(pending.count); metrics.updateQueuedBytes(pending.sizeInBytes); if (!bundler.isReady()) return; // try to fill up the bundle // Signal that we are about to send a message of a known size in bytes metrics.incrementMessages(); metrics.incrementMessageBytes(bundler.sizeInBytes()); //获取bundler中的数据集合 List<byte[]> nextMessage = bundler.drain(); // In failure case, we increment messages and spans dropped. Callback failureCallback = sendSpansCallback(nextMessage.size()); try { //对spans编码并发送到collect sender.sendSpans(nextMessage, failureCallback); } catch (RuntimeException e) { failureCallback.onError(e); // Raise in case the sender was closed out-of-band. if (e instanceof IllegalStateException) throw e; } } //ByteBoundedQueue的doDrain int doDrain(Consumer consumer) { int drainedCount = 0; int drainedSizeInBytes = 0; while (drainedCount < count) { //读取须要处理的数据 byte[] next = elements[readPos]; if (next == null) break; //判断consumer可否接受数据,能的话存入consumer的list中 if (consumer.accept(next)) { drainedCount++; drainedSizeInBytes += next.length; elements[readPos] = null; if (++readPos == elements.length) readPos = 0; // circle back to the front of the array } else { break; } } //更新count跟sizeInBytes count -= drainedCount; sizeInBytes -= drainedSizeInBytes; return drainedCount; }
接下来分析BoundedAsyncReporter的report若是上报span相关数据到queue中:ui
@Override public void report(S span) { checkNotNull(span, "span"); metrics.incrementSpans(1); //对span进行编码,这里使用thrift byte[] next = encoder.encode(span); //计算span的字节数 int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next)); metrics.incrementSpanBytes(next.length); //判断reporter是否关闭,以及span的字节数超过最大限制,没有的话则进行pending.offer操做 if (closed.get() || // don't enqueue something larger than we can drain messageSizeOfNextSpan > messageMaxBytes || !pending.offer(next)) { metrics.incrementSpansDropped(1); } } //ByteBoundedQueue的offer方法 boolean offer(byte[] next) { lock.lock(); try { //queue的长度已经满了 if (count == elements.length) return false; //queue的字节长度加上将要offer的字节数超过了最大的字节数 if (sizeInBytes + next.length > maxBytes) return false; //保存span数据 elements[writePos++] = next; if (writePos == elements.length) writePos = 0; // circle back to the front of the array count++; sizeInBytes += next.length; //唤醒消费线程 available.signal(); // alert any drainers return true; } finally { lock.unlock(); } }