欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

dubbo+zipkin调用链监控(二)

程序员文章站 2023-12-25 20:11:03
去年的时候写过dubbo+zipkin调用链监控,最近看到zipkin2配合brave实现起来会比我之前的实现要简单很多,因为brave将很多交互的内容都封装起来了,不需要自己去写具体的实现,比如如何去构建span,如何去上报数据。 收集器抽象 由于zipkin支持http以及kafka两种方式上报 ......

去年的时候写过dubbo+zipkin调用链监控,最近看到zipkin2配合brave实现起来会比我之前的实现要简单很多,因为brave将很多交互的内容都封装起来了,不需要自己去写具体的实现,比如如何去构建span,如何去上报数据。

收集器抽象

由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。

abstractzipkincollectorconfiguration

主要是针对下面两种收集方式的一些配置上的定义,最核心的是sender接口的定义,http与kafka是两类完全不同的实现。

public abstract sender getsender();

其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数。

  • zipkinurl

如果是http收集,那么对应的是zipkin api域名,如果是kafka,对应的是kafka集群的地址

  • topic

仅在收集方式为kafka是有效,http时传空值即可。

public abstractzipkincollectorconfiguration(string servicename,string zipkinurl,string topic){
    this.zipkinurl=zipkinurl;
    this.servicename=servicename;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上报方式,这里统一采用异常上传,并且配置上报的超时时间。

protected asyncreporter<span> spanreporter() {
    return asyncreporter
            .builder(getsender())
            .closetimeout(500, timeunit.milliseconds)
            .build(spanbytesencoder.json_v2);
}

下面这两方法,是配合应用构建span使用的。

注意那个sampler()方法,默认是什么也不做的意思,我们要想看到数据就需要配置成sampler.always_sample,这样才能真正将数据上报到zipkin服务器。

protected tracing tracing() {
    this.tracing= tracing
            .newbuilder()
            .localservicename(this.servicename)
            .sampler(sampler.always_sample)
            .spanreporter(spanreporter())
            .build();
    return this.tracing;
}

protected tracing gettracing(){
    return this.tracing;
}

httpzipkincollectorconfiguration

主要是实现getsender方法,可以借用okhttpsender这个对象来快速构建,api版本采用v2。

public class httpzipkincollectorconfiguration extends abstractzipkincollectorconfiguration {
    public httpzipkincollectorconfiguration(string servicename,string zipkinurl) {
        super(servicename,zipkinurl,null);
    }

    @override
    public sender getsender() {
        return okhttpsender.create(super.getzipkinurl()+"/api/v2/spans");
    }
}

okhttpsender这个类需要引用这个包

<dependency>
    <groupid>io.zipkin.reporter2</groupid>
    <artifactid>zipkin-sender-okhttp3</artifactid>
    <version>${zipkin-reporter2.version}</version>
</dependency>

kafkazipkincollectorconfiguration

同样也是实现getsender方法

public class kafkazipkincollectorconfiguration extends abstractzipkincollectorconfiguration {
    public kafkazipkincollectorconfiguration(string servicename,string zipkinurl,string topic) {
        super(servicename,zipkinurl,topic);
    }

    @override
    public sender getsender() {

        return kafkasender
                .newbuilder()
                .bootstrapservers(super.getzipkinurl())
                .topic(super.gettopic())
                .encoding(encoding.json)
                .build();
    }
}

kafkasender这个类需要引用这个包:

<dependency>
    <groupid>io.zipkin.reporter2</groupid>
    <artifactid>zipkin-sender-kafka11</artifactid>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工厂

由于上面创建了两个收集器配置类,使用时只能是其中之一,所以实际运行的实例需要根据配置来动态生成。zipkincollectorconfigurationfactory就是负责生成收集器实例的。

private final abstractzipkincollectorconfiguration zipkincollectorconfiguration;

@autowired
public zipkincollectorconfigurationfactory(traceconfig traceconfig){
    if(objects.equal("kafka", traceconfig.getzipkinsendtype())){
        zipkincollectorconfiguration=new kafkazipkincollectorconfiguration(
                traceconfig.getapplicationname(),
                traceconfig.getzipkinurl(),
                traceconfig.getzipkinkafkatopic());
    }
    else {
        zipkincollectorconfiguration = new httpzipkincollectorconfiguration(
                traceconfig.getapplicationname(),
                traceconfig.getzipkinurl());
    }
}

通过构建函数将我们的配置类traceconfig注入进来,然后根据发送方式来构建实例。另外提供一个辅助函数:

public tracing gettracing(){
    return this.zipkincollectorconfiguration.gettracing();
}

过滤器

在dubbo的过滤器中实现数据上传的功能逻辑相对简单,一般都在invoke方法执行前记录数据,然后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是通过发http请求实现需要编码,现在可以直接借用brave所提供的span来帮助我们完成,有两重要的方法:

  • finish

方法源码如下,在完成的时候会填写上完成的时间并上报数据,这一般应用于同步调用场景。

public void finish(tracecontext context, long finishtimestamp) {
    mutablespan span = this.spanmap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(long.valueof(finishtimestamp));
            this.reporter.report(span.tospan());
        }
    }
}
  • flush 与上面finish方法的不同点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,比如dubbo所提供的oneway方式调用。
public void flush(tracecontext context) {
    mutablespan span = this.spanmap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((long)null);
            this.reporter.report(span.tospan());
        }
    }
}

消费者

做为消费方,有一个核心功能就是将traceid以及spanid传递到服务提供方,这里还是通过dubbo提供的附加参数方式实现。

@override
public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception {
    if(!rpctracecontext.gettraceconfig().isenabled()){
        return invoker.invoke(invocation);
    }

    zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory=
            springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class);
    tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer();

    if(null==rpctracecontext.gettraceid()){
        rpctracecontext.start();
        rpctracecontext.settraceid(idutils.get());
        rpctracecontext.setparentid(null);
        rpctracecontext.setspanid(idutils.get());
    }
    else {
        rpctracecontext.setparentid(rpctracecontext.getspanid());
        rpctracecontext.setspanid(idutils.get());
    }
    tracecontext tracecontext= tracecontext.newbuilder()
            .traceid(rpctracecontext.gettraceid())
            .parentid(rpctracecontext.getparentid())
            .spanid(rpctracecontext.getspanid())
            .sampled(true)
            .build();

    span span=tracer.tospan(tracecontext).start();

    invocation.getattachments().put(rpctracecontext.trace_id_key, string.valueof(span.context().traceid()));
    invocation.getattachments().put(rpctracecontext.span_id_key, string.valueof(span.context().spanid()));

    result result = invoker.invoke(invocation);

    span.finish();

    return result;
}

提供者

@override
    public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception {
        if(!rpctracecontext.gettraceconfig().isenabled()){
            return invoker.invoke(invocation);
        }

        map<string, string> attaches = invocation.getattachments();
        if (!attaches.containskey(rpctracecontext.trace_id_key)){
            return invoker.invoke(invocation);
        }

        long traceid = long.valueof(attaches.get(rpctracecontext.trace_id_key));
        long spanid = long.valueof(attaches.get(rpctracecontext.span_id_key));

        attaches.remove(rpctracecontext.trace_id_key);
        attaches.remove(rpctracecontext.span_id_key);
        rpctracecontext.start();
        rpctracecontext.settraceid(traceid);
        rpctracecontext.setparentid(spanid);
        rpctracecontext.setspanid(idutils.get());

        zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory=
                springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class);
        tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer();

        tracecontext tracecontext= tracecontext.newbuilder()
                .traceid(rpctracecontext.gettraceid())
                .parentid(rpctracecontext.getparentid())
                .spanid(rpctracecontext.getspanid())
                .sampled(true)
                .build();
        span span = tracer.tospan(tracecontext).start();

        result result = invoker.invoke(invocation);

        span.finish();

        return result;

    }

异常流程

上面无论是消费者的过滤器还是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,如果出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。所以需要针对此问题做优化:可以在finally块中执行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消费者在调用服务时,异步调用问题

上面过滤器中调用span.finish都是基于同步模式,而由于dubbo除了同步调用外还提供了两种调用方式

  • 异步调用 通过callback机制的异步

  • oneway

只发起请求并不等待结果的异步调用,无callback一说

针对上面两类异步再加上同步调用,我们要想准确记录服务真正的时间,需要在消费方的过滤器中做如下处理:

创建一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里无论是成功还是失败。

private class asyncspancallback implements responsecallback{

    private span span;

    public asyncspancallback(span span){
        this.span=span;
    }

    @override
    public void done(object o) {
        span.finish();
    }

    @override
    public void caught(throwable throwable) {
        span.finish();
    }
}

再在调用invoke方法时,如果是oneway方式,则调用flush方法结果,如果是同步则直接调用finish方法,如果是异步则在回调时调用finish方法。

result result = null;
boolean isoneway = rpcutils.isoneway(invoker.geturl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isoneway) {
        span.flush();
    }
    else if(!isasync) {
        span.finish();
    }
}

待完善问题

过滤器中生成span的方式应该有更好的方法,还没有对brave做过多研究,后续想办法再优化下。另外我测试的场景是consumer调用provider,provider内部再调用provider2,我测试时发现第三步调用传递的parentid好像有点小问题,后续需要再确认下。

代码下载

上一篇:

下一篇: