dubbo中是如何使用netty的
由于公司一直用的是dubbo,就想看看dubbo底层是怎么使用netty的。所有的分析过程基本都写在代码的注释里
dubbo是通过spi的方式获取到各个对象的处理器的。看META-INF下的spring.handlers配置
使用的是DubboNamespaceHandler这个处理器。看下这个类:
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // dubbo:application
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); // dubbo:provider
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); // dubbo:consumer
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); // dubbo:service
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); // dubbo:reference
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
dubbo xml文件每个对象都定义对应的处理器。
dubbo通信的时候使用的是provider和consumer。provider在对应的标签是dubbo:service, consumer对应的标签是reference,对应的类分别是ServiceBean和ReferenceBean我们分别研究。
Provider
类似xml配置如下:
<dubbo:service interface="com.dfire.soa.consumer.service.IVersionService" ref="versionService"
version="${consumer.dubbo.version}" timeout="${timeout_100ms}"/>
对应的处理器类为ServiceBean。看它的类声明,实现了ApplicationListener接口,关注了ContextRefreshedEvent事件
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware
看这个类对ContextRefreshedEvent的处理
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export(); // 根据dubbo的架构图,这一步应该就是用来暴露协议的
}
}
只可能是export方法里处理了什么,看export方法
// ServiceBean.java
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
// ServiceConfig.java
public synchronized void export() {
// ...
if (delay != null && delay > 0) {
//...
} else {
doExport();
}
}
protected synchronized void doExport() {
// ... 各种各样的判断
if (ref instanceof GenericService) { // 如果实现是泛化服务
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else { // 如果是具体服务,反射获取接口名
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = Boolean.FALSE.toString();
}
// ... 又是一堆判断
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
doExportUrls(); // 重点!!! 暴露出url
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
// 暴露url
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true); // 获取服务提供者注册的url
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs); // 从url解析出协议和地址!!!
}
}
接下来看它doExportUrlsFor1Protocol方法,具体解析协议和url的过程,这段代码就比较复杂了,可读性也不是很好,就直接进入到协议解析。本例子用的是dubbo:protocol使用的是dubbo协议,进入DubboProtocol的export方法。
// DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 。。。一些处理
openServer(url); // 在这里打开服务器
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url)); // 开始创建服务器
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
createServer顾名思义,创建服务器,好像离答案越来越近了。接下来是一系列的调用,直到真正进入Netty相关的代码:
// DubboProtocol.java
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
//...
// enable heartbeat by default
// ...
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // 绑定ExchangeServer
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// ...
return server;
}
// Exchangers.java
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
// HeaderExchanger.java
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
// Transporters.java
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
// NettyTransporter.java
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
终于,进入到NettyTransporter的bind方法。
// NettyServer.java的构造函数
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // 进入他的父类AbstractServer的构造函数
}
// AbstractServer.java的构造函数
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen(); // doOpen方法真正建立netty服务端
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
看doOpen方法,真相大白,完全是我们熟悉的netty代码。
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
Server使用了NettyCodecAdapter提供的解码器和编码器,并且使用了NettyServerHandler这个处理器,这里不再详细分析。下面开始看消费者
Consumer
消费者整体调用过程和提供者查不到,但是入口有不同。消费者直接进入Spring获取Bean所调用的getObject方法
// ReferenceBean.java
public Object getObject() throws Exception {
return get();
}
// ReferenceConfig.java
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init(); // 初始化
}
return ref;
}
private void init() {
// ... 一系列前置处理
ref = createProxy(map); // 创建代理
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
private T createProxy(Map<String, String> map) {
if (isJvmRefer) {
// ...
} else {
// ...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 引用一个远程服务
} else {
// ...多个urls的处理
}
}
// ...
}
调用refprotocol.refer方法,SPI里配置的是RegistryProtocol。
// RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// ...
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY)); // 通过Register订阅服务
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这里用了RegistryDirectory的subscribe方法订阅服务,后面比较复杂,不再一一赘述,最终和Provider一样,进入到NettyTransporter这个类,不同于Provider,Provider使用了bind方法,而Consumer使用了connect方法。
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener); // Provider使用
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener); // Consumer使用
}
}
看看NettyClient做了什么。
// NettyClient.java
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler)); // 同样是交给基类处理
}
// AbstractClient.java
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// ...
try {
doOpen(); // 建立连接
} catch (Throwable t) {
// ...
}
try {
// connect.
connect();
// ...
} catch (Exception t) {
// ...
}
// ...
}
前面看Provider也有个doOpen方法,是NettyServer启动的过程。那么这个Client的doOpen方法猜想也应该是Client建立连接的过程。看下是不是呢?果然如此
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
这就是Dubbo Consumer使用dubbo的过程
贴一张dubbo的官方架构图,理一下调用的路线,我的理解是蓝色箭头代表上述Provider分析过程,红色箭头代表上述Consumer分析过程
推荐阅读