brave是zipkin的java客户端,负责数据收集以及上传。 首先看下怎么构造一个brave对象:java
public Brave brave(Reporter<Span> reporter){ //建立Brave builder,并设置server name Brave.Builder builder = new Brave.Builder("server name"); //建立reporter,本例使用http方式上传到collector,也能够使用kafka等方式上传数据 Reporter<Span> reporter = AsyncReporter.builder(URLConnectionSender.create("collector url")).build(); builder.reporter(reporter); //设置采样率,最大为1,最小0.01 builder.traceSampler(Sampler.ALWAYS_SAMPLE); //下面看builder.build()建立brave -》 Brave brave = builder.build(); return brave; } private Brave(Builder builder) { //做为服务端接受外部系统请求的tracer serverTracer = ServerTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .state(builder.state) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //做为客户端请求外部系统的tracer clientTracer = ClientTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .state(builder.state) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //本地tracer,在进程内操做,好比一个file io操做或者一个代码块的执行 localTracer = LocalTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .allowNestedLocalSpans(builder.allowNestedLocalSpans) .spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state)) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //建立ss拦截器 serverRequestInterceptor = new ServerRequestInterceptor(serverTracer); //建立sr拦截器 serverResponseInterceptor = new ServerResponseInterceptor(serverTracer); //建立cs拦截器 clientRequestInterceptor = new ClientRequestInterceptor(clientTracer); //建立cr拦截器 clientResponseInterceptor = new ClientResponseInterceptor(clientTracer); //专门用来提交应用定义的annotation,好比一些自定义的指标 serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state)); //下面的三种binder支持将span绑定到新的线程上 serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state); clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state); localSpanThreadBinder = new LocalSpanThreadBinder(builder.state); }
下面分析brave怎么集成springmvc的:spring
//BraveApiConfig将Brave的部分属性转换成bean暴露出去 @Configuration public class BraveApiConfig { @Autowired Brave brave; @Bean @Scope(value = "singleton") public ClientTracer clientTracer() { return brave.clientTracer(); } @Bean @Scope(value = "singleton") public ServerTracer serverTracer() { return brave.serverTracer(); } @Bean @Scope(value = "singleton") public ClientRequestInterceptor clientRequestInterceptor() { return brave.clientRequestInterceptor(); } @Bean @Scope(value = "singleton") public ClientResponseInterceptor clientResponseInterceptor() { return brave.clientResponseInterceptor(); } @Bean @Scope(value = "singleton") public ServerRequestInterceptor serverRequestInterceptor() { return brave.serverRequestInterceptor(); } @Bean @Scope(value = "singleton") public ServerResponseInterceptor serverResponseInterceptor() { return brave.serverResponseInterceptor(); } @Bean(name = "serverSpanAnnotationSubmitter") @Scope(value = "singleton") public AnnotationSubmitter serverSpanAnnotationSubmitter() { return brave.serverSpanAnnotationSubmitter(); } @Bean @Scope(value = "singleton") public ServerSpanThreadBinder serverSpanThreadBinder() { return brave.serverSpanThreadBinder(); } } @Configuration @Import(BraveApiConfig.class) @EnableWebMvc public class BraveConfig extends WebMvcConfigurerAdapter { //ServerRequestInterceptor等都是Brave内部属性 @Autowired private ServerRequestInterceptor requestInterceptor; @Autowired private ServerResponseInterceptor responseInterceptor; @Autowired private ServerSpanThreadBinder serverThreadBinder; //添加ServletHandlerInterceptor拦截器,下面看怎么构建ServletHandlerInterceptor的 -》 @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new ServletHandlerInterceptor(requestInterceptor, responseInterceptor, new DefaultSpanNameProvider(), serverThreadBinder)); } } public class ServletHandlerInterceptor extends HandlerInterceptorAdapter { static final String HTTP_SERVER_SPAN_ATTRIBUTE = ServletHandlerInterceptor.class.getName() + ".server-span"; private final ServerRequestInterceptor requestInterceptor; private final SpanNameProvider spanNameProvider; private final ServerResponseInterceptor responseInterceptor; private final ServerSpanThreadBinder serverThreadBinder; @Autowired public ServletHandlerInterceptor(ServerRequestInterceptor requestInterceptor, ServerResponseInterceptor responseInterceptor, SpanNameProvider spanNameProvider, final ServerSpanThreadBinder serverThreadBinder) { this.requestInterceptor = requestInterceptor; this.spanNameProvider = spanNameProvider; this.responseInterceptor = responseInterceptor; this.serverThreadBinder = serverThreadBinder; } //前置处理 @Override public boolean preHandle(final HttpServletRequest request, final HttpServletResponse response, final Object handler) { //ServerRequestInterceptor前置处理 -》 requestInterceptor.handle(new HttpServerRequestAdapter(new HttpServerRequest() { @Override public String getHttpHeaderValue(String headerName) { return request.getHeader(headerName); } @Override public URI getUri() { try { return new URI(request.getRequestURI()); } catch (URISyntaxException e) { throw new RuntimeException(e); } } @Override public String getHttpMethod() { return request.getMethod(); } }, spanNameProvider)); return true; } //若是是异步请求则在开始前调用 @Override public void afterConcurrentHandlingStarted(final HttpServletRequest request, final HttpServletResponse response, final Object handler) { //在reqeust中保存span信息,由于异步请求的话会在新的线程中处理业务逻辑,致使在新的线程中获取以前线程中的span request.setAttribute(HTTP_SERVER_SPAN_ATTRIBUTE, serverThreadBinder.getCurrentServerSpan()); //清空当前线程中span信息 serverThreadBinder.setCurrentSpan(null); } //后置处理 @Override public void afterCompletion(final HttpServletRequest request, final HttpServletResponse response, final Object handler, final Exception ex) { //若是是异步请求的话,则须要到request中获取span final ServerSpan span = (ServerSpan) request.getAttribute(HTTP_SERVER_SPAN_ATTRIBUTE); if (span != null) { serverThreadBinder.setCurrentSpan(span); } //ServerResponseInterceptor后置处理 responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() { @Override public int getHttpStatusCode() { return response.getStatus(); } })); }
ServerRequestInterceptor处理逻辑:mvc
public void handle(ServerRequestAdapter adapter) { //清空当前线程中的span serverTracer.clearCurrentSpan(); //从request中获取trace相应信息,若是请求植入了zipkin //trace信息,则沿用request中的span信息,不然会返回一个空的traceData final TraceData traceData = adapter.getTraceData(); Boolean sample = traceData.getSample(); if (sample != null && Boolean.FALSE.equals(sample)) { //不须要采样,建立一个不采样的span serverTracer.setStateNoTracing(); LOGGER.fine("Received indication that we should NOT trace."); } else { boolean clientOriginatedTrace = traceData.getSpanId() != null; if (clientOriginatedTrace) { //该traceData是从请求中传递过来生成的 LOGGER.fine("Received span information as part of request."); //根据traceData建立span,并保存到当前线程中 serverTracer.setStateCurrentTrace(traceData.getSpanId(), adapter.getSpanName()); } else { //请求中没有完整的或者没有trace信息,则会根据采样率判断是否生成一个彻底新的traceId、span,并把span存入当前线程中 LOGGER.fine("Received no span state."); serverTracer.setStateUnknown(adapter.getSpanName()); } //在span中添加sr类型的annotation serverTracer.setServerReceived(); //若是是从请求中生成的span,则须要清空timestamp以及startTick //由于原始span是在client生成的 if (clientOriginatedTrace) { Span span = serverTracer.spanAndEndpoint().span(); synchronized (span) { span.setTimestamp(null); span.startTick = null; } } //根据url生成BinaryAnnotation,并存入span for(KeyValueAnnotation annotation : adapter.requestAnnotations()) { serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue()); } } }
ServerResponseInterceptor处理逻辑:app
public void handle(ServerResponseAdapter adapter) { // We can submit this in any case. When server state is not set or // we should not trace this request nothing will happen. LOGGER.fine("Sending server send."); try { //设置响应码到span的BinaryAnnotation for(KeyValueAnnotation annotation : adapter.responseAnnotations()) { serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue()); } //设置ss annotation,并上报数据 具体逻辑看AnnotationSubmitter的submitEndAnnotation方法-》 serverTracer.setServerSend(); } finally { serverTracer.clearCurrentSpan(); } }
下面分析AnnotationSubmitter的submitEndAnnotation方法:dom
boolean submitEndAnnotation(String annotationName, Reporter<zipkin.Span> reporter) { Span span = spanAndEndpoint().span(); if (span == null) { return false; } Long startTimestamp; Long startTick; synchronized (span) { startTimestamp = span.getTimestamp(); startTick = span.startTick; } long endTimestamp = currentTimeMicroseconds(startTimestamp, startTick); Annotation annotation = Annotation.create( endTimestamp, annotationName, spanAndEndpoint().endpoint() ); synchronized (span) { span.addToAnnotations(annotation); if (startTimestamp != null) { //计算rr->rs耗费的时间 span.setDuration(Math.max(1L, endTimestamp - startTimestamp)); } } //上报消息 reporter.report(span.toZipkin()); return true; }
springmvc相关代码分析结束。异步