dubbo 监控中心源码分析
程序员文章站
2022-07-13 10:55:48
...
MonitorFactory 创建 Monitor 的接口
@SPI("dubbo")
public interface MonitorFactory {
/**
* Create monitor.
*
* @param url
* @return monitor
*/
@Adaptive("protocol")
Monitor getMonitor(URL url);
}
MonitorService 接口定义收集和查询监控数据
Monitor 代表了监控对象,继承于 MonitorService 和 Node 接口.
MetricsService 这一个接口应该是留作扩展用的.
AbstractMonitorFactory 是一个抽象类,实现了大部分创建 MonitorFactory 的代码,留有扩展点.
MonitorFilter 采用责任链模式模式,收集调用信息.
class MonitorListener implements Listener {
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
/**
* The collector logic, it will be handled by the default monitor
*
* @param invoker
* @param invocation
* @param result the invoke result
* @param remoteHost the remote host address
* @param start the timestamp the invoke begin
* @param error if there is an error on the invoke
*/
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
Monitor monitor = monitorFactory.getMonitor(monitorUrl);
if (monitor == null) {
return;
}
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
monitor.collect(statisticsURL);
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
/**
* Create statistics url
*
* @param invoker
* @param invocation
* @param result
* @param remoteHost
* @param start
* @param error
* @return
*/
private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
// ---- service statistics ----
long elapsed = System.currentTimeMillis() - start; // invocation cost
int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
String application = invoker.getUrl().getParameter(APPLICATION_KEY);
String service = invoker.getInterface().getName(); // service name
String method = RpcUtils.getMethodName(invocation); // method name
String group = invoker.getUrl().getParameter(GROUP_KEY);
String version = invoker.getUrl().getParameter(VERSION_KEY);
int localPort;
String remoteKey, remoteValue;
if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
// ---- for service consumer ----
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- for service provider ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(INPUT_KEY) != null) {
input = invocation.getAttachment(INPUT_KEY);
}
if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
output = result.getAttachment(OUTPUT_KEY);
}
return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
}
}
Statistics 类相当于一个 pojo 类,用于收集调用的基本信息,例如:
private URL url;
private String application;
private String service;
private String method;
private String group;
private String version;
private String client;
private String server;
在看代码的时候,发现 DubboMonitorFactory 类既有 getMonitor 方法,又有 createMonitor 方法,当时就赶到很奇怪,但是仔细看了下代码,发现 getMonitor 方法中调用了 createMonitor 方法,如下:
public Monitor getMonitor(URL url) {
url = url.setPath(MonitorService.class.getName()).addParameter(INTERFACE_KEY, MonitorService.class.getName());
String key = url.toServiceStringWithoutResolving();
Monitor monitor = MONITORS.get(key);
Future<Monitor> future = FUTURES.get(key);
if (monitor != null || future != null) {
return monitor;
}
LOCK.lock();
try {
monitor = MONITORS.get(key);
future = FUTURES.get(key);
if (monitor != null || future != null) {
return monitor;
}
final URL monitorUrl = url;
final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
FUTURES.put(key, completableFuture);
completableFuture.thenRunAsync(new MonitorListener(key), executor);
return null;
} finally {
// unlock
LOCK.unlock();
}
}
在看 DubboMonitorFactory 类的时候,发现即有 protocol 又有 proxyFactory 等的,不是很明白.
仔细品味下,其实会发现这段代码是不是像极了一个对一个协议的引用?例如 HttpProtocol 协议.
protected Monitor createMonitor(URL url) {
URLBuilder urlBuilder = URLBuilder.from(url);
urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
if (StringUtils.isEmpty(url.getPath())) {
urlBuilder.setPath(MonitorService.class.getName());
}
String filter = url.getParameter(REFERENCE_FILTER_KEY);
if (StringUtils.isEmpty(filter)) {
filter = "";
} else {
filter = filter + ",";
}
urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
REFERENCE_FILTER_KEY, filter + "-monitor");
Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
return new DubboMonitor(monitorInvoker, monitorService);
}
最后,我们来看下 DubboMonitor 的源码实现.
我们先看下构造方法,可以清晰的看到,dubbo 是每分钟上报一次.
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
this.monitorInvoker = monitorInvoker;
this.monitorService = monitorService;
this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
// collect timer for collecting statistics data
sendFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
// collect data
send();
} catch (Throwable t) {
logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
}
}, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
}
update[0] :调用成功的次数
update[1] :调用失败的次数
update[2] :总调用流量(请求包的总大小)。
update[3] :总响应流量(响应包的总大小)。
update[4] :总响应时长(总服务调用开销)。
update[5] :一次收集周期的平均TPS。
update[6] :最大请求包大小。
update[7] :最大响应包大小。
update[8] :最大响应时间。
update[9] :最大TPS。
但是我没有看到向监控中心发送数据的代码了,为啥了???
<!--?xml version="1.0" encoding="UTF-8"?-->
其实是有的,我们上面不是分析了,在 DubboMonitorFactory 中不是像一个协议一样吗?那个 refer 其实就是引用的注册中心的服务.
下一篇: dubbo http 协议分析