Nacos源码之服务注册源码解析
前言
本篇就开始探索Nacos的服务注册逻辑,建议大家准备两个项目,一个是Nacos的源码项目,一个是整合Nacos注册中心的业务项目。探索过程也是分为客户端和服务端的代码解析,我觉得这样应该更利于代码跟进和理解,篇幅过长,还请耐心观看。
主要类图
客户端逻辑
首先来看客户端,也就是我们的项目在启动时,Nacos依赖偷偷干了哪些见不得人的事情。打开项目依赖,找到引入的注册中心依赖,我们知道SpringBoot项目启动时会读取spring.factories文件进行自动装配
你们猜一猜哪个是首先要看的类,我猜是NacosDiscoveryAutoConfiguration
这个类,你问我为什么,因为它写在第一位。你觉得不是?那我只能说是男人的直觉,源码看多掉头发的男人的直觉,没有毛病~
点开这个类,反手就是注入三个Bean,分别是NacosServiceRegistry
,NacosRegistration
和NacosAutoServiceRegistration
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
NacosDiscoveryProperties
这个类就是提供Nacos各种参数配置的类,很多集成框架都提供了这么一个类,比如RedisProperties
,只需要在配置参数类上加上@ConfigurationProperties注解,然后在配置类或者启动类上加上@EnableConfigurationProperties注解,就可以将配置参数类注入到容器中,这种高逼格的做法大家学废了吗。
NacosRegistration
这个配置类的功能主要就是处理我们配置的metadata元数据,既然支持灵活配置,那就肯定需要代码来支持。
NacosServiceRegistry
这个配置类就比较重要了,它里面包含了一个NamingService
命名服务类:
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final NamingService namingService;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
......
}
实例化的时候调用了NacosDiscoveryProperties
的**namingServiceInstance()**方法
public NamingService namingServiceInstance() {
if (null != namingService) {
return namingService;
}
try {
namingService = NacosFactory.createNamingService(getNacosProperties());
}
catch (Exception e) {
log.error("create naming service error!properties={},e=,", this, e);
return null;
}
return namingService;
}
跟进去最后调用了NamingFactory
工厂的**createNamingService()**静态方法
public class NamingFactory {
public static NamingService createNamingService(String serverList) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(String.class);
NamingService vendorImpl = (NamingService)constructor.newInstance(serverList);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
}
可以看到通过获取带有Properties
参数的构造器反射实例化NacosNamingService
对象,实例化时就会调用带有Properties
参数的构造方法
public class NacosNamingService implements NamingService {
/**
* 命名空间【public、prod、dev】
*
* Each Naming service should have different namespace.
*/
private String namespace;
private String endpoint;
/**
* Nacos 服务地址【127.0.0.1:8848,127.0.0.1:8848】
**/
private String serverList;
/**
* 本地注册表缓存存放目录【/Users/xxx/nacos/naming/public】
**/
private String cacheDir;
/**
* 日志文件名【naming.log】
**/
private String logName;
/**
* 本机响应器
**/
private HostReactor hostReactor;
/**
* 心跳响应器
**/
private BeatReactor beatReactor;
/**
* 事件调度器
**/
private EventDispatcher eventDispatcher;
/**
* 命名代理
**/
private NamingProxy serverProxy;
public NacosNamingService(String serverList) {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) {
init(properties);
}
private void init(Properties properties) {
ValidatorUtils.checkInitParam(properties);
// 初始化命名空间
namespace = InitUtils.initNamespaceForNaming(properties);
// 初始化服务地址
initServerAddr(properties);
// 初始化根上下文
InitUtils.initWebRootContext();
// 初始化缓存文件地址
initCacheDir();
// 初始化日志文件名称
initLogName(properties);
// 实例化时间调度器
eventDispatcher = new EventDispatcher();
// 实例化命名代理程序
serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
// 实例化心跳响应器
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
// 实例化本机响应器
hostReactor = new HostReactor(
eventDispatcher,
serverProxy,
cacheDir,
// 启动时就加载缓存【False】
isLoadCacheAtStart(properties),
// 初始化线程池的核心线程数
initPollingThreadCount(properties)
);
}
......
}
**init()**方法进行一系列的初始化工作,BeatReactor
用来处理心跳上报,HostReactor
处理故障转移和接收服务端推送的服务列表数据。看一下实例化HostReactor
时都做了哪些事情
public class HostReactor {
/**
* 从服务器拉取最新数据延时时间
**/
private static final long DEFAULT_DELAY = 1000L;
/**
* 等待更新完成时间
**/
private static final long UPDATE_HOLD_INTERVAL = 5000L;
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<>();
/**
* 本地的服务注册表
**/
private Map<String, ServiceInfo> serviceInfoMap;
/**
* 需要更新的集合
**/
private Map<String, Object> updatingMap;
/**
* 推送接收器
**/
private PushReceiver pushReceiver;
/**
* 事件调度程序
**/
private EventDispatcher eventDispatcher;
/**
* 命名代理
**/
private NamingProxy serverProxy;
/**
* 故障转移响应器
**/
private FailoverReactor failoverReactor;
private String cacheDir;
private ScheduledExecutorService executor;
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
}
else {
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.updatingMap = new ConcurrentHashMap<>();
// 实例化故障转移响应器
this.failoverReactor = new FailoverReactor(this, cacheDir);
/**
* 实例化推送接收器【重要】
**/
this.pushReceiver = new PushReceiver(this);
}
}
FailoverReactor
用来处理故障转移,PushReceiver
用来和服务端保持通信,接收推送的服务列表数据
public class PushReceiver implements Runnable {
private ScheduledExecutorService executorService;
private static final int UDP_MSS = 64 * 1024;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// 初始化套接字
udpSocket = new DatagramSocket();
// 初始化执行器
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
executorService.execute(this);
}
catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
/**
* 死循环,一直接收服务端推送的最新服务数据
**/
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 接收服务端的服务更新数据
udpSocket.receive(packet);
// 尝试解压缩
String json = new String(IoUtils.tryDecompress(packet.getData()), StandardCharsets.UTF_8).trim();
NAMING_LOGGER.info("received push datHostReactora: " + json + " from " + packet.getAddress().toString());
// 解析数据
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
/**
* 更新本地注册表缓存
**/
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}";
}
else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap())) + "\"}";
}
else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}";
}
/**
* 发送确认数据给服务端
*
* 请求会被{@link com.alibaba.nacos.naming.push.PushService.Receiver#run()}接收并处理
**/
udpSocket.send(new DatagramPacket(
ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length,
packet.getSocketAddress())
);
}
catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
}
创建一个线程和服务端建立通信,异步接收服务端推送的服务数据,然后更新本地的注册表缓存数据,并发送回执。
NacosAutoServiceRegistration
是最重要的配置类了,它担负了服务注册的使命
public class NacosAutoServiceRegistration
extends AbstractAutoServiceRegistration<Registration> {
private NacosRegistration registration;
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
}
把上面注入的NacosServiceRegistry
作为参数传入到父类AbstractAutoServiceRegistration
,由上面的类图可以看到父类是一个监听器,监听了WebServerInitializedEvent
事件,服务准备好后会回调它的**onApplicationEvent()**方法
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent> {
......
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
protected void register() {
this.serviceRegistry.register(getRegistration());
}
......
}
这里**getRegistration()方法获取的就是之前注入的NacosRegistration
对象,调用register()方法,其实调用的就是NacosServiceRegistry
的register()**方法
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setMetadata(registration.getMetadata());
return instance;
}
NamingService
就是之前通过反射实例化的NacosNamingService
public void registerInstance(String serviceName,
String groupName,
Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
// 服务名称【DEFAULT_GROUP@@order-service】
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
// 服务地址【172.20.10.2】
beatInfo.setIp(instance.getIp());
// 服务端口号【8082】
beatInfo.setPort(instance.getPort());
// 所属的集群【DEFAULT】
beatInfo.setCluster(instance.getClusterName());
// 权重【1.0】
beatInfo.setWeight(instance.getWeight());
// 元数据【"version" -> "v1"】
beatInfo.setMetadata(instance.getMetadata());
// 周期性
beatInfo.setScheduled(false);
// 心跳周期【5s】
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
/**
* 添加心跳信息,即心跳任务
**/
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
/**
* 服务注册
**/
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
BeatReactor
和NamingProxy
之前都通过调用NacosNamingService
的构造方法进行实例化时顺带实例化了。
Nacos有两种模式的实例,一种是基于内存的临时实例,一种是基于磁盘的永久实例,分别对应CAP模式中的AP和CP。这里如果是临时实例会组装实例数据为BeatInfo
对象,调用BeatReactor
的**addBeatInfo()**方法添加上报心跳的任务,其实就是往之前实例化的ScheduledThreadPoolExecutor
中添加一个任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// DEFAULT_GROUP@@order-service#172.20.10.2#8082
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
// 保存心跳信息
dom2Beat.put(key, beatInfo);
/**
* 添加一个一次性心跳任务,延迟 5s 执行
**/
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
// 停止则跳过
if (beatInfo.isStopped()) {
return;
}
// 获取下次上报的间隔的时间
long nextTime = beatInfo.getPeriod();
try {
// 向服务端发送心跳
JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
boolean lightBeatEnabled = false;
if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
long interval = result.getIntValue("clientBeatInterval");
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.containsKey(CommonParams.CODE)) {
code = result.getIntValue(CommonParams.CODE);
}
// 找不到请求的资源
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// 再尝试发送
serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
}
catch (Exception ignore) {
}
}
}
catch (NacosException ne) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
}
// 循环添加心跳任务,每隔 5s 发送一次心跳
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
主要还是看服务的注册逻辑,NamingProxy
的**registerService()**方法
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
final Map<String, String> params = new HashMap<>(9);
// public
params.put(CommonParams.NAMESPACE_ID, namespaceId);
// DEFAULT_GROUP@@order-service
params.put(CommonParams.SERVICE_NAME, serviceName);
// DEFAULT_GROUP
params.put(CommonParams.GROUP_NAME, groupName);
// DEFAULT
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
// 172.20.10.2
params.put("ip", instance.getIp());
// 8082
params.put("port", String.valueOf(instance.getPort()));
// 1.0
params.put("weight", String.valueOf(instance.getWeight()));
// true
params.put("enable", String.valueOf(instance.isEnabled()));
// true
params.put("healthy", String.valueOf(instance.isHealthy()));
// true
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
// "version":"v1"
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
public String reqAPI(String api, Map<String, String> params, String body,
List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
// 服务列表不为空
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
// 随机挑选一个服务
String server = servers.get(index);
try {
/**
* 对 Nacos 服务进行调用
**/
return callServer(api, params, body, server, method);
}
catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
/**
* 失败则尝试其他的服务
**/
index = (index + 1) % servers.size();
}
}
/**
* 如果对已知的 Nacos 服务调用全部失败,则执行备选方案
**/
if (StringUtils.isNotBlank(nacosDomain)) {
// 最多重试三次
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
/**
* 对 Nacos 服务进行调用
**/
return callServer(api, params, body, nacosDomain, method);
}
catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
public String callServer(String api, Map<String, String> params,
String body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end;
// 添加安全信息数据
injectSecurityInfo(params);
// 构造请求头数据
List<String> headers = builderHeaders();
// http://127.0.0.1:8848/nacos/v1/ns/instance
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
}
else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
/**
* 注册请求会被服务端的控制器处理
*
* {@see com.alibaba.nacos.naming.controllers.InstanceController#register(javax.servlet.http.HttpServletRequest)}
**/
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe(end - start);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(result.code, result.content);
}
注释和代码都简洁易懂,直接调用服务端暴露的接口进行服务注册。
服务端逻辑
服务端的启动流程就先不看了,后面探讨多节点共识算法进行选举的时候再从头看,有兴趣的可以先看一下,直接看InstanceController
的**register()**方法是怎么处理注册的
public String register(HttpServletRequest request) throws Exception {
// DEFAULT_GROUP@@order-service
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// public
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 解析成实例数据
final Instance instance = parseInstance(request);
/**
* 注册实例
**/
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
ServiceManager
类有一个双层Map的私有属性,那就是注册中心的数据结构,Eureka也是用的这种结构来存储服务列表
/**
1. 双层 Map 结构,很多注册中心都使用了这种结构
2.
3. Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
图示的话大概就是这样的
调用**registerInstance()**方法进行注册,以AP模式将服务实例注册到服务列表中
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
/**
* 创建空服务,服务实例还未添加
**/
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
/**
* 添加服务实例
**/
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
先调用**createEmptyService()**方法从注册表中获取对应的服务,如果不存在则将当前需要注册的服务数据进行封装并添加到注册表中,然后对其进行初始化,主要有这么三步:
- 添加定时检查的心跳任务,这个任务对于超过15秒没有收到客户端心跳的实例会将其置为不健康的状态,这样就处于不可被发现的状态;如果某个实例超过30秒还没有收到心跳,那么会直接从注册表中剔除该实例,被剔除的实例如果恢复心跳发送则会重新注册
- 将对应的集群与该服务进行关联,而且没有对注册表进行遍历找到对应的集群再进行关联,而是直接关联,因为Nacos采用的做法是只对没有关联的集群做服务的关联,这样就能保证关联的一定是当前正在注册的服务实例所在的集群
- 初始化集群,主要是对集群服务实例做健康检查
然后再调用addInstance()方法来注册服务实例
public void addInstance(String namespaceId, String serviceName,
boolean ephemeral, Instance... instances) throws NacosException {
/**
* com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName
* com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
**/
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, instances);
// 初始化实例集合
Instances instancesList = new Instances();
instancesList.setInstanceList(instanceList);
// 保存实例
consistencyService.put(key, instancesList);
}
}
**addIpAddresses()**方法主要就是将当前服务的所有健康的服务实例拿出来,然后存储到ConsistencyService
服务中,Nacos针对AP模式和CP模式,设计了两个子服务,AP模式对应的是DistroConsistencyServiceImpl
服务,CP模式对应的是RaftConsistencyServiceImpl
服务,为了更好的进行数据同步,两种模式还分别存储了对应的服务实例数据。
看一下AP模式是怎么保存实例的,即DistroConsistencyServiceImpl
的**put()**方法
public void put(String key, Record value) throws NacosException {
/**
* 1、将注册实例更新到内存注册表
*
* 添加节点改变任务,异步执行节点改变任务
**/
onPut(key, value);
/**
* 2、同步实例信息到服务集群的其他节点
*
* 将新节点交由数据同步任务调度程序进行同步处理
**/
taskDispatcher.addTask(key);
}
这里就是比较复杂了,Nacos服务不仅需要跟客户端进行交互,还需要跟Nacos集群中的其他节点进行数据同步,所以第一个方法是将新注册的服务数据基于Socket通信推送到客户端,还记得上面我们看到客户端启动时会和服务端建立Socket通信吗,客户端会接收服务端推送的最新服务数据,第一个方法就是负责这项工作的。第二个方法就是将新注册的服务数据和集群中的其他节点进行同步
public void onPut(String key, Record value) {
// 是临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
// 设置实例
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
/**
* 保存数据
**/
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
/**
* 添加数据变更任务,将服务数据推送给客户端
**/
notifier.addTask(key, ApplyAction.CHANGE);
}
调用了Notifier
类的**addTask()方法,这个类实现了Runnable
接口,并且其中一个私有属性是阻塞队列,添加任务的方法就是往阻塞队列中添加一个元素,它重写的run()**方法是一个一直监听队列元素的死循环,如果是数据变更的任务,则会先进行内存中的服务数据的更新操作,用了CopyOnWrite的思想来避免读写冲突,不直接对原有数据进行操作,而是拷贝一份,更新完成后再替换原有的内存数据。
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
/**
* CopyOnWrite 设计思想
*
* 不对原有的集合数据作修改,而是新创建一个副本拷贝后作修改
**/
// 重新构造一个集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 将原有的数据拷贝到新的集合中
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
continue;
}
// 处理实例的默认集群名称
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 原有的集群集合中不包含新的实例的集群名称
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON());
// 初始化一个实例集群对象
Cluster cluster = new Cluster(instance.getClusterName(), this);
/**
* 初始化,主要是添加健康检查任务
**/
cluster.init();
/**
* 添加到原有的实例集群集合中
**/
getClusterMap().put(instance.getClusterName(), cluster);
}
// 从新的实例集群集合中获取新实例所在的实例集合
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
// 不存在则添加到实例集合中
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
}
catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
// 遍历全部的实例集合
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> entryIPs = entry.getValue();
/**
* 更新服务注册表数据
**/
clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
}
// 设置最后一次修改的时间戳
setLastModifiedMillis(System.currentTimeMillis());
// 推送数据变更事件
getPushService().serviceChanged(this);
......
}
本地缓存更新完成后,会发布一个服务变更事件,处理服务变更事件的代码逻辑就是基于UDP协议将新的服务数据推送给客户端
public void onApplicationEvent(ServiceChangeEvent event) {
// 获取实例所在的服务
Service service = event.getService();
// 服务名
String serviceName = service.getName();
// 命名空间
String namespaceId = service.getNamespaceId();
/**
* 基于 UDP 协议推送更新给服务消费者
*
* 某个服务可能既是服务提供者也是服务消费者
**/
Future future = udpSender.schedule(() -> {
try {
// 获取需要推送的客户端,即服务消费者
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// 遍历所有的客户端
for (PushClient client : clients.values()) {
// 是僵尸节点
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
// 默认推送的缓存时间是 10s
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 封装数据
Receiver.AckEntry ackEntry;
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
}
else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
udpPush(ackEntry);
}
}
catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
}
finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
// 标记处理完成
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
udpPush()就是实际推送数据的方法
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 超过最大尝试发送次数
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
/**
* 发送数据给服务消费者
*
* 数据会被对应客户端服务器的{@link com.alibaba.nacos.client.naming.core.PushReceiver#run()}方法接收处理
**/
udpSocket.send(ackEntry.origin);
// 更新尝试发送次数
ackEntry.increaseRetryTime();
// 添加一个重试任务,延时 10s 执行
executorService.schedule(
new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
TimeUnit.MILLISECONDS
);
return ackEntry;
}
catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
客户端的接收流程在服务发现的源码分析博客一起看吧,这里就先看到这里。这个方法的功能主要就是更新完本地的内存服务数据,然后异步将最新的数据推送到客户端,客户端会刷新本地的注册表缓存,以便服务的调用。
Nacos服务集群之间的数据同步,主要就是通过调用TaskDispatcher
任务分发器的**addTask()方法将任务添加到实现了TaskScheduler
类的阻塞队列属性中,其重写的run()**方法也是一个一直监听队列元素的死循环,遍历所有的集群节点,然后通过DataSyncer
数据同步复制器来进行集群间的数据同步的
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
// 遍历所有需要同步的实例
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
// 需要同步的实例
String key = iterator.next();
// 相关的实例已经在同步进程中了
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
// 移除该实例
iterator.remove();
}
}
}
// 没有需要同步的实例了
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 异步进行实例同步任务
GlobalExecutor.submitDataSync(() -> {
/**
* 第一步、检查服务器列表
*
* 1. check the server
**/
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
// 获取需要同步的实例
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
/**
* 第二步、获取{@link Datum}并判空,里面包含需要同步的实例数据
*
* @see com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut(java.lang.String, com.alibaba.nacos.naming.pojo.Record)
*
* 2. get the datums by keys and check the datum is empty or not
**/
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
/**
* 第三步、发送请求同步数据
*
* @see DistroController#onSyncDatum(java.util.Map)
**/
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
/**
* 如果同步失败了,可能是网络原因或者是节点原因,那么会进行重试机制
**/
retrySync(syncTask);
}
else {
/**
* 清除此任务的所有标志
*
* clear all flags of this task
**/
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}
NamingProxy
的**syncData()**方法就是通过HTTP请求调用集群服务暴露的接口,接口的处理逻辑和注册逻辑差不多,创建或者更新服务数据,然后发布数据变更的事件,将新的服务数据推送给客户端。
Nacos是可以支撑百万注册实例的高性能注册中心框架,所以在设计和编码方面是有很多值得借鉴和学习的地方的,比如用了很多的异步操作,任务类一直监听阻塞队列中的元素,实现相应的异步功能。再比如CP模式下,主服务节点和其他服务节点之间的数据同步,一般分布式系统节点之间的数据同步采用的是两阶段提交,Nacos则是根据多数节点成功则更新成功的思想,创建了一个CountDownLatch
来计数,发送HTTP请求进行数据同步,计数器为零则表示多数节点同步成功等等,这些场景和解决方法也是我们看源码能学到的东西。
服务端代码就先看到到这里吧,篇幅已经比较长了,但是主要的方法都过了一遍,后面应该会轻松不少。Nacos专题写完我会把带有注释的源码放到同性社区或者公众号上供大家下载,下一篇继续探索服务发现的流程。
本文地址:https://blog.csdn.net/CX610602108/article/details/110099591
推荐阅读
-
Android图片加载利器之Picasso源码解析
-
微信小程序之侧边栏滑动实现过程解析(附完整源码)
-
Vue源码解析之数据响应系统的使用
-
CountDownLatch源码解析之await()
-
CountDownLatch源码解析之countDown()
-
Vue源码解读之Component组件注册的实现
-
Spring5源码解析4-refresh方法之invokeBeanFactoryPostProcessors
-
Laravel框架源码解析之模型Model原理与用法解析
-
Spring源码解析之ConfigurableApplicationContext
-
angularjs 源码解析之injector