dubbo+zipkin调用链监控

 

 

 

 

收集器抽象sql

因为zipkin支持http以及kafka两种方式上报数据,因此在配置上须要作下抽象。bootstrap

AbstractZipkinCollectorConfigurationapi

主要是针对下面两种收集方式的一些配置上的定义,最核心的是Sender接口的定义,http与kafka是两类彻底不一样的实现。服务器

public abstract Sender getSender();

其次是协助性的构造函数,主要是配合构建收集器所须要的一些参数。架构

  • zipkinUrl

若是是http收集,那么对应的是zipkin api域名,若是是kafka,对应的是kafka集群的地址并发

  • topic

仅在收集方式为kafka是有效,http时传空值便可。异步

public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
    this.zipkinUrl=zipkinUrl;
    this.serviceName=serviceName;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上报方式,这里统一采用异常上传,而且配置上报的超时时间。分布式

protected AsyncReporter<Span> spanReporter() {
    return AsyncReporter
            .builder(getSender())
            .closeTimeout(500, TimeUnit.MILLISECONDS)
            .build(SpanBytesEncoder.JSON_V2);
}

下面这两方法,是配合应用构建span使用的。ide

注意那个sampler()方法,默认是什么也不作的意思,咱们要想看到数据就须要配置成Sampler.ALWAYS_SAMPLE,这样才能真正将数据上报到zipkin服务器。
protected Tracing tracing() {
    this.tracing= Tracing
            .newBuilder()
            .localServiceName(this.serviceName)
            .sampler(Sampler.ALWAYS_SAMPLE)
            .spanReporter(spanReporter())
            .build();
    return this.tracing;
}
protected Tracing getTracing(){
    return this.tracing;
}

HttpZipkinCollectorConfiguration函数

主要是实现getSender方法,能够借用OkHttpSender这个对象来快速构建,api版本采用v2。

public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {
        super(serviceName,zipkinUrl,null);
    }
    @Override
    public Sender getSender() {
        return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
    }
}

OkHttpSender这个类须要引用这个包

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

KafkaZipkinCollectorConfiguration

一样也是实现getSender方法

public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {
        super(serviceName,zipkinUrl,topic);
    }
    @Override
    public Sender getSender() {
        return KafkaSender
                .newBuilder()
                .bootstrapServers(super.getZipkinUrl())
                .topic(super.getTopic())
                .encoding(Encoding.JSON)
                .build();
    }
}

KafkaSender这个类须要引用这个包:

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-kafka11</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工厂

因为上面建立了两个收集器配置类,使用时只能是其中之一,因此实际运行的实例须要根据配置来动态生成。ZipkinCollectorConfigurationFactory就是负责生成收集器实例的。

private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;
@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
    if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
        zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl(),
                traceConfig.getZipkinKafkaTopic());
    }
    else {
        zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl());
    }
}

经过构建函数将咱们的配置类TraceConfig注入进来,而后根据发送方式来构建实例。另外提供一个辅助函数:

public Tracing getTracing(){
    return this.zipkinCollectorConfiguration.getTracing();
}

过滤器

在dubbo的过滤器中实现数据上传的功能逻辑相对简单,通常都在invoke方法执行前记录数据,而后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是经过发http请求实现须要编码,如今能够直接借用brave所提供的span来帮助咱们完成,有两重要的方法:

  • finish

方法源码以下,在完成的时候会填写上完成的时间并上报数据,这通常应用于同步调用场景。

public void finish(TraceContext context, long finishTimestamp) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(Long.valueOf(finishTimestamp));
            this.reporter.report(span.toSpan());
        }
    }
}
  • flush 与上面finish方法的不一样点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,好比dubbo所提供的oneway方式调用。
public void flush(TraceContext context) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((Long)null);
            this.reporter.report(span.toSpan());
        }
    }
}

消费者

作为消费方,有一个核心功能就是将traceId以及spanId传递到服务提供方,这里仍是经过dubbo提供的附加参数方式实现。

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if(!RpcTraceContext.getTraceConfig().isEnabled()){
        return invoker.invoke(invocation);
    }
    ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
            SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
    Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
    if(null==RpcTraceContext.getTraceId()){
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(IdUtils.get());
        RpcTraceContext.setParentId(null);
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    else {
        RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    TraceContext traceContext= TraceContext.newBuilder()
            .traceId(RpcTraceContext.getTraceId())
            .parentId(RpcTraceContext.getParentId())
            .spanId(RpcTraceContext.getSpanId())
            .sampled(true)
            .build();
    Span span=tracer.toSpan(traceContext).start();
    invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
    invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));
    Result result = invoker.invoke(invocation);
    span.finish();
    return result;
}

提供者

@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if(!RpcTraceContext.getTraceConfig().isEnabled()){
            return invoker.invoke(invocation);
        }
        Map<String, String> attaches = invocation.getAttachments();
        if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
            return invoker.invoke(invocation);
        }
        Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
        Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));
        attaches.remove(RpcTraceContext.TRACE_ID_KEY);
        attaches.remove(RpcTraceContext.SPAN_ID_KEY);
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(traceId);
        RpcTraceContext.setParentId(spanId);
        RpcTraceContext.setSpanId(IdUtils.get());
        ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
                SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
        Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
        TraceContext traceContext= TraceContext.newBuilder()
                .traceId(RpcTraceContext.getTraceId())
                .parentId(RpcTraceContext.getParentId())
                .spanId(RpcTraceContext.getSpanId())
                .sampled(true)
                .build();
        Span span = tracer.toSpan(traceContext).start();
        Result result = invoker.invoke(invocation);
        span.finish();
        return result;
    }

异常流程

上面不管是消费者的过滤器仍是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,若是出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。因此须要针对此问题作优化:能够在finally块中执行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消费者在调用服务时,异步调用问题

上面过滤器中调用span.finish都是基于同步模式,而因为dubbo除了同步调用外还提供了两种调用方式

  • 异步调用 经过callback机制的异步
  • oneway

只发起请求并不等待结果的异步调用,无callback一说

针对上面两类异步再加上同步调用,咱们要想准确记录服务真正的时间,须要在消费方的过滤器中作以下处理:

建立一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里不管是成功仍是失败。

private class AsyncSpanCallback implements ResponseCallback{
    private Span span;
    public AsyncSpanCallback(Span span){
        this.span=span;
    }
    @Override
    public void done(Object o) {
        span.finish();
    }
    @Override
    public void caught(Throwable throwable) {
        span.finish();
    }
}

再在调用invoke方法时,若是是oneway方式,则调用flush方法结果,若是是同步则直接调用finish方法,若是是异步则在回调时调用finish方法。

Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isOneway) {
        span.flush();
    }
    else if(!isAsync) {
        span.finish();
    }
}

欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!

相关文章
相关标签/搜索