深度解析dubbo monitor(api)
本文基于dubbo v2.6.x
1.MonitorFactory
MonitorFactory 是monitor工厂抽象,里面提供了一个getMonitor的方法。我们来看下MonitorFactory定义:
我们看到MonitorFactory 接口是dubbo spi 的扩展点,而且 getMonitor方法能够根据url的protocol自适应选择对应的实现类,下面是它的继承关系图。
2.AbstractMonitorFactory
AbstractMonitorFactory是MonitorFactory 的抽象实现,它主要是对Monitor对象做了缓存操作,在获取Monitor的时候,会先从缓存中获取。
// lock for getting monitor center
private static final ReentrantLock LOCK = new ReentrantLock();
// monitor centers Map<RegistryAddress, Registry>
private static final Map<String, Monitor> MONITORS = new ConcurrentHashMap<String, Monitor>();
private static final Map<String, ListenableFuture<Monitor>> FUTURES = new ConcurrentHashMap<String, ListenableFuture<Monitor>>();
private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("DubboMonitorCreator", true));
这是它的四个静态成员,LOCK是在需要创建monitor的时候用到,MONITORS是monitor的缓存,FUTURES 是任务的缓存,executor是执行任务的线程池,主要用来执行ListenableFuture 里面的创建Monitor的任务。
接下来我们来看下getMonitor 方法的实现:
@Override
public Monitor getMonitor(URL url) {// 设置path 为com.alibaba.dubbo.monitor.MonitorService ,添加参数interface
// 设置path com.alibaba.dubbo.monitor.MonitorService interface =com.alibaba.dubbo.monitor.MonitorService
url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName());
// 生成一个key
String key = url.toServiceStringWithoutResolving();
Monitor monitor = MONITORS.get(key);// 获取monitor
Future<Monitor> future = FUTURES.get(key);
// 如果缓存了具体的monitor就 返回
if (monitor != null || future != null) {
return monitor;
}
LOCK.lock();
try {
monitor = MONITORS.get(key);
future = FUTURES.get(key);
if (monitor != null || future != null) {
return monitor;
}
final URL monitorUrl = url;
final ListenableFutureTask<Monitor> listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));
listenableFutureTask.addListener(new MonitorListener(key));
executor.execute(listenableFutureTask);
FUTURES.put(key, listenableFutureTask);
return null;
} finally {
// unlock
LOCK.unlock();
}
}
先是生成一个MonitorUrl,可以看到path设置成com.alibaba.dubbo.monitor.MonitorService,然后添加interface参数值也是com.alibaba.dubbo.monitor.MonitorService,接着就是通过这个url生成一个key,然后根据这个key从MONITORS缓存中获取Monitor对象
同时根据这个key从FUTURES任务缓存中获取任务,如果都存在就将获取的Monitor对象返回,否则,就加锁再获取判断一次,还是不符合的话就调用ListenableFutureTask.create()方法创建一个ListenableFutureTask 对象,其中参数是new了一个MonitorCreator对象(一会我们再来看下这个类),这就就是向listenableFutureTask 添加一个listener(MonitorListener 对象),最后使用线程池,执行这个任务,将任务缓存起来,释放锁资源。
先来看下MonitorCreator对象
class MonitorCreator implements Callable<Monitor> {
private URL url;
public MonitorCreator(URL url) {
this.url = url;
}
@Override
public Monitor call() throws Exception {
Monitor monitor = AbstractMonitorFactory.this.createMonitor(url);
return monitor;
}
}
我们可以看到实现Callable接口,在call方法中调用了AbstractMonitorFactory的createMonitor(url)方法来创建Monitor返回,这个createMonitor方法是个抽象方法,需要AbstractMonitorFactory子类来具体实现。
接下来再来看下MonitorListener:
class MonitorListener implements Runnable {
private String key;
public MonitorListener(String key) {
this.key = key;
}
@Override
public void run() {
try {
// 根据key 从缓存中获取ListenableFuture 任务
ListenableFuture<Monitor> listenableFuture = AbstractMonitorFactory.FUTURES.get(key);
// 从ListenableFuture获取创建的那个Monitor对象,然后放到Monitor缓存中
AbstractMonitorFactory.MONITORS.put(key, listenableFuture.get());
//从任务缓存中移除这个任务
AbstractMonitorFactory.FUTURES.remove(key);
} catch (InterruptedException e) {
logger.warn("Thread was interrupted unexpectedly, monitor will never be got.");
AbstractMonitorFactory.FUTURES.remove(key);
} catch (ExecutionException e) {
logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e);
}
}
}
这个MonitorListener会在Future执行完run方法的时候调用执行,也就是创建完成Monitor对象后执行。其实这里dubbo就是用了异步创建Monitor。
3.Monitor与MonitorService
Monitor 是监控的一个接口,然后继承Mode与MonitorService,在Monitor 接口中没有自己的抽象方法(就是里面是空的),它的作用就是聚合抽象了监控这个概念。
MonitorService 这个接口里面定义了一堆监控指标常量。
有application,interface,method,group,version,consumer,provider,timestamp 这种用于区分调用的。
有success,failure,input,output,elapsed,concurrent,max.input,max.output,max.elapsed,max.concurrent这种用于统计调用的。
其中还有两个抽象方法:
// 收集
void collect(URL statistics);
//查找
List<URL> lookup(URL query);
上一篇: ThreadPoolExecutor线程池原理解读
下一篇: Java线程池的使用