欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

dubbo 监控中心源码分析

程序员文章站 2022-07-13 10:55:48
...
MonitorFactory 创建 Monitor 的接口
@SPI("dubbo")
public interface MonitorFactory {
 
    /**
     * Create monitor.
     *
     * @param url
     * @return monitor
     */
    @Adaptive("protocol")
    Monitor getMonitor(URL url);
 
}
 
 
MonitorService 接口定义收集和查询监控数据
 
Monitor 代表了监控对象,继承于 MonitorService 和 Node 接口.
 
MetricsService 这一个接口应该是留作扩展用的.
 
AbstractMonitorFactory 是一个抽象类,实现了大部分创建 MonitorFactory 的代码,留有扩展点.
 
MonitorFilter 采用责任链模式模式,收集调用信息.
 
class MonitorListener implements Listener {
 
        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
 
        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
 
        /**
         * The collector logic, it will be handled by the default monitor
         *
         * @param invoker
         * @param invocation
         * @param result     the invoke result
         * @param remoteHost the remote host address
         * @param start      the timestamp the invoke begin
         * @param error      if there is an error on the invoke
         */
        private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            try {
                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
                Monitor monitor = monitorFactory.getMonitor(monitorUrl);
                if (monitor == null) {
                    return;
                }
                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
                monitor.collect(statisticsURL);
            } catch (Throwable t) {
                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
 
        /**
         * Create statistics url
         *
         * @param invoker
         * @param invocation
         * @param result
         * @param remoteHost
         * @param start
         * @param error
         * @return
         */
        private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            // ---- service statistics ----
            long elapsed = System.currentTimeMillis() - start; // invocation cost
            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
            String application = invoker.getUrl().getParameter(APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // service name
            String method = RpcUtils.getMethodName(invocation); // method name
            String group = invoker.getUrl().getParameter(GROUP_KEY);
            String version = invoker.getUrl().getParameter(VERSION_KEY);
 
            int localPort;
            String remoteKey, remoteValue;
            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
                // ---- for service consumer ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- for service provider ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            String input = "", output = "";
            if (invocation.getAttachment(INPUT_KEY) != null) {
                input = invocation.getAttachment(INPUT_KEY);
            }
            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
                output = result.getAttachment(OUTPUT_KEY);
            }
 
            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
        }
 
    }
 
 
Statistics 类相当于一个 pojo 类,用于收集调用的基本信息,例如:
 
    private URL url;
 
    private String application;
 
    private String service;
 
    private String method;
 
    private String group;
 
    private String version;
 
    private String client;
 
    private String server;
 
在看代码的时候,发现 DubboMonitorFactory 类既有 getMonitor 方法,又有 createMonitor 方法,当时就赶到很奇怪,但是仔细看了下代码,发现 getMonitor 方法中调用了 createMonitor 方法,如下:
 
public Monitor getMonitor(URL url) {
        url = url.setPath(MonitorService.class.getName()).addParameter(INTERFACE_KEY, MonitorService.class.getName());
        String key = url.toServiceStringWithoutResolving();
        Monitor monitor = MONITORS.get(key);
        Future<Monitor> future = FUTURES.get(key);
        if (monitor != null || future != null) {
            return monitor;
        }
 
        LOCK.lock();
        try {
            monitor = MONITORS.get(key);
            future = FUTURES.get(key);
            if (monitor != null || future != null) {
                return monitor;
            }
 
            final URL monitorUrl = url;
            final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
            FUTURES.put(key, completableFuture);
            completableFuture.thenRunAsync(new MonitorListener(key), executor);
 
            return null;
        } finally {
            // unlock
            LOCK.unlock();
        }
    }
 
在看 DubboMonitorFactory 类的时候,发现即有 protocol 又有 proxyFactory 等的,不是很明白.
 
仔细品味下,其实会发现这段代码是不是像极了一个对一个协议的引用?例如 HttpProtocol 协议.
protected Monitor createMonitor(URL url) {
        URLBuilder urlBuilder = URLBuilder.from(url);
        urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
        if (StringUtils.isEmpty(url.getPath())) {
            urlBuilder.setPath(MonitorService.class.getName());
        }
        String filter = url.getParameter(REFERENCE_FILTER_KEY);
        if (StringUtils.isEmpty(filter)) {
            filter = "";
        } else {
            filter = filter + ",";
        }
        urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
                REFERENCE_FILTER_KEY, filter + "-monitor");
        Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
        MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
        return new DubboMonitor(monitorInvoker, monitorService);
    }
 
最后,我们来看下 DubboMonitor 的源码实现.
 
我们先看下构造方法,可以清晰的看到,dubbo 是每分钟上报一次.
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
        this.monitorInvoker = monitorInvoker;
        this.monitorService = monitorService;
        this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
        // collect timer for collecting statistics data
        sendFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
            try {
                // collect data
                send();
            } catch (Throwable t) {
                logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
            }
        }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
    }
 
      update[0] :调用成功的次数
      update[1] :调用失败的次数
      update[2] :总调用流量(请求包的总大小)。
      update[3] :总响应流量(响应包的总大小)。
      update[4] :总响应时长(总服务调用开销)。
      update[5] :一次收集周期的平均TPS。
      update[6] :最大请求包大小。
      update[7] :最大响应包大小。
      update[8] :最大响应时间。
      update[9] :最大TPS。
 
但是我没有看到向监控中心发送数据的代码了,为啥了???

<!--?xml version="1.0" encoding="UTF-8"?-->

其实是有的,我们上面不是分析了,在 DubboMonitorFactory 中不是像一个协议一样吗?那个 refer 其实就是引用的注册中心的服务.