欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

dubbo中是如何使用netty的

程序员文章站 2022-05-22 20:34:34
...

由于公司一直用的是dubbo,就想看看dubbo底层是怎么使用netty的。所有的分析过程基本都写在代码的注释里

 

dubbo是通过spi的方式获取到各个对象的处理器的。看META-INF下的spring.handlers配置

dubbo中是如何使用netty的

使用的是DubboNamespaceHandler这个处理器。看下这个类:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // dubbo:application
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); // dubbo:provider
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); // dubbo:consumer
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); 
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); // dubbo:service
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); // dubbo:reference
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

}

dubbo xml文件每个对象都定义对应的处理器。

dubbo通信的时候使用的是provider和consumer。provider在对应的标签是dubbo:service, consumer对应的标签是reference,对应的类分别是ServiceBean和ReferenceBean我们分别研究。

Provider

类似xml配置如下:

<dubbo:service interface="com.dfire.soa.consumer.service.IVersionService" ref="versionService"
                   version="${consumer.dubbo.version}" timeout="${timeout_100ms}"/>

对应的处理器类为ServiceBean。看它的类声明,实现了ApplicationListener接口,关注了ContextRefreshedEvent事件

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware

看这个类对ContextRefreshedEvent的处理

@Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export(); // 根据dubbo的架构图,这一步应该就是用来暴露协议的
        }
    }

只可能是export方法里处理了什么,看export方法

    // ServiceBean.java
    public void export() {
        super.export();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }
    // ServiceConfig.java
    public synchronized void export() {
        // ...

        if (delay != null && delay > 0) {
            //...
        } else {
            doExport();
        }
    }
    protected synchronized void doExport() {
        // ... 各种各样的判断
        if (ref instanceof GenericService) { // 如果实现是泛化服务
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else { // 如果是具体服务,反射获取接口名
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        // ... 又是一堆判断
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStub(interfaceClass);
        checkMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls(); // 重点!!! 暴露出url
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

    // 暴露url
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true); // 获取服务提供者注册的url
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs); // 从url解析出协议和地址!!!
        }
    }

接下来看它doExportUrlsFor1Protocol方法,具体解析协议和url的过程,这段代码就比较复杂了,可读性也不是很好,就直接进入到协议解析。本例子用的是dubbo:protocol使用的是dubbo协议,进入DubboProtocol的export方法。

    // DubboProtocol.java
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 。。。一些处理
        openServer(url); // 在这里打开服务器
        optimizeSerialization(url);
        return exporter;
    }
    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url)); // 开始创建服务器
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

createServer顾名思义,创建服务器,好像离答案越来越近了。接下来是一系列的调用,直到真正进入Netty相关的代码:

    // DubboProtocol.java
    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        //...
        // enable heartbeat by default
        // ...
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler); // 绑定ExchangeServer
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        // ...
        return server;
    }    
    // Exchangers.java
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
    // HeaderExchanger.java
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    // Transporters.java
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
    // NettyTransporter.java
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

终于,进入到NettyTransporter的bind方法。

    // NettyServer.java的构造函数
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // 进入他的父类AbstractServer的构造函数
    }
    // AbstractServer.java的构造函数
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen(); // doOpen方法真正建立netty服务端
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

看doOpen方法,真相大白,完全是我们熟悉的netty代码。

    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

Server使用了NettyCodecAdapter提供的解码器和编码器,并且使用了NettyServerHandler这个处理器,这里不再详细分析。下面开始看消费者

Consumer

消费者整体调用过程和提供者查不到,但是入口有不同。消费者直接进入Spring获取Bean所调用的getObject方法

    // ReferenceBean.java
    public Object getObject() throws Exception {
        return get();
    }
    // ReferenceConfig.java
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init(); // 初始化
        }
        return ref;
    }
    
    private void init() {
        // ... 一系列前置处理       
        ref = createProxy(map);  // 创建代理
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

    private T createProxy(Map<String, String> map) {
       if (isJvmRefer) {
            // ...
        } else {
            // ...
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 引用一个远程服务
            } else {
                // ...多个urls的处理
            }
        }

        // ...
    }

调用refprotocol.refer方法,SPI里配置的是RegistryProtocol。

// RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // ...
        return doRefer(cluster, registry, type, url);
    }

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY)); // 通过Register订阅服务

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

这里用了RegistryDirectory的subscribe方法订阅服务,后面比较复杂,不再一一赘述,最终和Provider一样,进入到NettyTransporter这个类,不同于Provider,Provider使用了bind方法,而Consumer使用了connect方法。

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener); // Provider使用
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener); // Consumer使用
    }

}

看看NettyClient做了什么。

    // NettyClient.java
    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler)); // 同样是交给基类处理
    }

    // AbstractClient.java 
    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);

        // ...
        try {
            doOpen(); // 建立连接
        } catch (Throwable t) {
            // ...
        }
        try {
            // connect.
            connect(); 
            // ...
        } catch (Exception t) {
            // ...
        }

        // ...
    }

前面看Provider也有个doOpen方法,是NettyServer启动的过程。那么这个Client的doOpen方法猜想也应该是Client建立连接的过程。看下是不是呢?果然如此

 

protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

这就是Dubbo Consumer使用dubbo的过程

贴一张dubbo的官方架构图,理一下调用的路线,我的理解是蓝色箭头代表上述Provider分析过程,红色箭头代表上述Consumer分析过程

dubbo中是如何使用netty的