dubbo+zipkin调用链监控(二)
去年的时候写过dubbo+zipkin调用链监控,最近看到zipkin2配合brave实现起来会比我之前的实现要简单很多,因为brave将很多交互的内容都封装起来了,不需要自己去写具体的实现,比如如何去构建span,如何去上报数据。
收集器抽象
由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。
abstractzipkincollectorconfiguration
主要是针对下面两种收集方式的一些配置上的定义,最核心的是sender接口的定义,http与kafka是两类完全不同的实现。
public abstract sender getsender();
其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数。
- zipkinurl
如果是http收集,那么对应的是zipkin api域名,如果是kafka,对应的是kafka集群的地址
- topic
仅在收集方式为kafka是有效,http时传空值即可。
public abstractzipkincollectorconfiguration(string servicename,string zipkinurl,string topic){ this.zipkinurl=zipkinurl; this.servicename=servicename; this.topic=topic; this.tracing=this.tracing(); }
配置上报方式,这里统一采用异常上传,并且配置上报的超时时间。
protected asyncreporter<span> spanreporter() { return asyncreporter .builder(getsender()) .closetimeout(500, timeunit.milliseconds) .build(spanbytesencoder.json_v2); }
下面这两方法,是配合应用构建span使用的。
注意那个sampler()方法,默认是什么也不做的意思,我们要想看到数据就需要配置成sampler.always_sample,这样才能真正将数据上报到zipkin服务器。
protected tracing tracing() { this.tracing= tracing .newbuilder() .localservicename(this.servicename) .sampler(sampler.always_sample) .spanreporter(spanreporter()) .build(); return this.tracing; } protected tracing gettracing(){ return this.tracing; }
httpzipkincollectorconfiguration
主要是实现getsender方法,可以借用okhttpsender这个对象来快速构建,api版本采用v2。
public class httpzipkincollectorconfiguration extends abstractzipkincollectorconfiguration { public httpzipkincollectorconfiguration(string servicename,string zipkinurl) { super(servicename,zipkinurl,null); } @override public sender getsender() { return okhttpsender.create(super.getzipkinurl()+"/api/v2/spans"); } }
okhttpsender这个类需要引用这个包
<dependency> <groupid>io.zipkin.reporter2</groupid> <artifactid>zipkin-sender-okhttp3</artifactid> <version>${zipkin-reporter2.version}</version> </dependency>
kafkazipkincollectorconfiguration
同样也是实现getsender方法
public class kafkazipkincollectorconfiguration extends abstractzipkincollectorconfiguration { public kafkazipkincollectorconfiguration(string servicename,string zipkinurl,string topic) { super(servicename,zipkinurl,topic); } @override public sender getsender() { return kafkasender .newbuilder() .bootstrapservers(super.getzipkinurl()) .topic(super.gettopic()) .encoding(encoding.json) .build(); } }
kafkasender这个类需要引用这个包:
<dependency> <groupid>io.zipkin.reporter2</groupid> <artifactid>zipkin-sender-kafka11</artifactid> <version>${zipkin-reporter2.version}</version> </dependency>
收集器工厂
由于上面创建了两个收集器配置类,使用时只能是其中之一,所以实际运行的实例需要根据配置来动态生成。zipkincollectorconfigurationfactory就是负责生成收集器实例的。
private final abstractzipkincollectorconfiguration zipkincollectorconfiguration; @autowired public zipkincollectorconfigurationfactory(traceconfig traceconfig){ if(objects.equal("kafka", traceconfig.getzipkinsendtype())){ zipkincollectorconfiguration=new kafkazipkincollectorconfiguration( traceconfig.getapplicationname(), traceconfig.getzipkinurl(), traceconfig.getzipkinkafkatopic()); } else { zipkincollectorconfiguration = new httpzipkincollectorconfiguration( traceconfig.getapplicationname(), traceconfig.getzipkinurl()); } }
通过构建函数将我们的配置类traceconfig注入进来,然后根据发送方式来构建实例。另外提供一个辅助函数:
public tracing gettracing(){ return this.zipkincollectorconfiguration.gettracing(); }
过滤器
在dubbo的过滤器中实现数据上传的功能逻辑相对简单,一般都在invoke方法执行前记录数据,然后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是通过发http请求实现需要编码,现在可以直接借用brave所提供的span来帮助我们完成,有两重要的方法:
- finish
方法源码如下,在完成的时候会填写上完成的时间并上报数据,这一般应用于同步调用场景。
public void finish(tracecontext context, long finishtimestamp) { mutablespan span = this.spanmap.remove(context); if(span != null && !this.noop.get()) { synchronized(span) { span.finish(long.valueof(finishtimestamp)); this.reporter.report(span.tospan()); } } }
- flush 与上面finish方法的不同点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,比如dubbo所提供的oneway方式调用。
public void flush(tracecontext context) { mutablespan span = this.spanmap.remove(context); if(span != null && !this.noop.get()) { synchronized(span) { span.finish((long)null); this.reporter.report(span.tospan()); } } }
消费者
做为消费方,有一个核心功能就是将traceid以及spanid传递到服务提供方,这里还是通过dubbo提供的附加参数方式实现。
@override public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception { if(!rpctracecontext.gettraceconfig().isenabled()){ return invoker.invoke(invocation); } zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory= springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class); tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer(); if(null==rpctracecontext.gettraceid()){ rpctracecontext.start(); rpctracecontext.settraceid(idutils.get()); rpctracecontext.setparentid(null); rpctracecontext.setspanid(idutils.get()); } else { rpctracecontext.setparentid(rpctracecontext.getspanid()); rpctracecontext.setspanid(idutils.get()); } tracecontext tracecontext= tracecontext.newbuilder() .traceid(rpctracecontext.gettraceid()) .parentid(rpctracecontext.getparentid()) .spanid(rpctracecontext.getspanid()) .sampled(true) .build(); span span=tracer.tospan(tracecontext).start(); invocation.getattachments().put(rpctracecontext.trace_id_key, string.valueof(span.context().traceid())); invocation.getattachments().put(rpctracecontext.span_id_key, string.valueof(span.context().spanid())); result result = invoker.invoke(invocation); span.finish(); return result; }
提供者
@override public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception { if(!rpctracecontext.gettraceconfig().isenabled()){ return invoker.invoke(invocation); } map<string, string> attaches = invocation.getattachments(); if (!attaches.containskey(rpctracecontext.trace_id_key)){ return invoker.invoke(invocation); } long traceid = long.valueof(attaches.get(rpctracecontext.trace_id_key)); long spanid = long.valueof(attaches.get(rpctracecontext.span_id_key)); attaches.remove(rpctracecontext.trace_id_key); attaches.remove(rpctracecontext.span_id_key); rpctracecontext.start(); rpctracecontext.settraceid(traceid); rpctracecontext.setparentid(spanid); rpctracecontext.setspanid(idutils.get()); zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory= springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class); tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer(); tracecontext tracecontext= tracecontext.newbuilder() .traceid(rpctracecontext.gettraceid()) .parentid(rpctracecontext.getparentid()) .spanid(rpctracecontext.getspanid()) .sampled(true) .build(); span span = tracer.tospan(tracecontext).start(); result result = invoker.invoke(invocation); span.finish(); return result; }
异常流程
上面无论是消费者的过滤器还是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,如果出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。所以需要针对此问题做优化:可以在finally块中执行finish方法。
try { result = invoker.invoke(invocation); } finally { span.finish(); }
消费者在调用服务时,异步调用问题
上面过滤器中调用span.finish都是基于同步模式,而由于dubbo除了同步调用外还提供了两种调用方式
-
异步调用 通过callback机制的异步
-
oneway
只发起请求并不等待结果的异步调用,无callback一说
针对上面两类异步再加上同步调用,我们要想准确记录服务真正的时间,需要在消费方的过滤器中做如下处理:
创建一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里无论是成功还是失败。
private class asyncspancallback implements responsecallback{ private span span; public asyncspancallback(span span){ this.span=span; } @override public void done(object o) { span.finish(); } @override public void caught(throwable throwable) { span.finish(); } }
再在调用invoke方法时,如果是oneway方式,则调用flush方法结果,如果是同步则直接调用finish方法,如果是异步则在回调时调用finish方法。
result result = null; boolean isoneway = rpcutils.isoneway(invoker.geturl(), invocation); try { result = invoker.invoke(invocation); } finally { if(isoneway) { span.flush(); } else if(!isasync) { span.finish(); } }
待完善问题
过滤器中生成span的方式应该有更好的方法,还没有对brave做过多研究,后续想办法再优化下。另外我测试的场景是consumer调用provider,provider内部再调用provider2,我测试时发现第三步调用传递的parentid好像有点小问题,后续需要再确认下。
代码下载
上一篇: phpstorm编辑器乱码问题解决