Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerListUpdater解读
程序员文章站
2022-03-15 22:32:59
...
一 DynamicServerListLoadBalancer在类图中的位置
二 DynamicServerListLoadBalancer源码解读
1 关键代码请见注释
2 源码位置:ribbon-master\ribbon-loadbalancer\src\main\java\com\netflix\loadbalancer\DynamicServerListLoadBalancer.java
//继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。实现了下面两个功能
//服务实例在运行期间的动态更新
//对服务器实例清单的过滤功能,可以通过过滤器来选择地获取一批服务实例清单
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
boolean isSecure = false;
boolean useTunnel = false;
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
//服务列表操作对象
//这里泛型T是一个Server的子类,代表了一个具体的服务实例扩展类
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
//用于触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单
//这个对象实现的是对Server的更新,又被称为服务更新器
protected volatile ServerListUpdater serverListUpdater;
public DynamicServerListLoadBalancer() {
super();
}
@Deprecated
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter) {
this(
clientConfig,
rule,
ping,
serverList,
filter,
new PollingServerListUpdater()
);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
public ServerList<T> getServerListImpl() {
return serverListImpl;
}
public void setServerListImpl(ServerList<T> niwsServerList) {
this.serverListImpl = niwsServerList;
}
public ServerListFilter<T> getFilter() {
return filter;
}
public void setFilter(ServerListFilter<T> filter) {
this.filter = filter;
}
public ServerListUpdater getServerListUpdater() {
return serverListUpdater;
}
public void setServerListUpdater(ServerListUpdater serverListUpdater) {
this.serverListUpdater = serverListUpdater;
}
@Override
public void forceQuickPing() {
// no-op
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
private String getIdentifier() {
return this.getClientConfig().getClientName();
}
public void stopServerListRefreshing() {
if (serverListUpdater != null) {
serverListUpdater.stop();
}
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
/**
* Update the AllServer list in the LoadBalancer if necessary and enabled
*
* @param ls
*/
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString());
sb.append("ServerList:" + String.valueOf(serverListImpl));
return sb.toString();
}
@Override
public void shutdown() {
super.shutdown();
stopServerListRefreshing();
}
@Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
public String getLastUpdate() {
return serverListUpdater.getLastUpdate();
}
@Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
public long getDurationSinceLastUpdateMs() {
return serverListUpdater.getDurationSinceLastUpdateMs();
}
@Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
public int getNumberMissedCycles() {
return serverListUpdater.getNumberMissedCycles();
}
@Monitor(name="NumThreads", type=DataSourceType.GAUGE)
public int getCoreThreads() {
return serverListUpdater.getCoreThreads();
}
}
三 ServerListUpdater源码解读
public interface ServerListUpdater {
//对ServerList的更新的具体更新操作
public interface UpdateAction {
void doUpdate();
}
//启动更新器,传入的updateAction对象为更新操作的具体实现
void start(UpdateAction updateAction);
//停止服务更新器
void stop();
//获取最近更新时间戳
String getLastUpdate();
//获取上一次更新到现在的时间间隔,单位为毫秒
long getDurationSinceLastUpdateMs();
//获取错过更新的周期数
int getNumberMissedCycles();
//获取核心线程数
int getCoreThreads();
}
四 ServerListUpdater类图
-
PollingServerListUpdater:动态更新服务列表的默认策略,DynamicServerListLoadBalancer负载均衡器默认实现就是它,它通过定时任务的方式进行服务列表的更新。
-
EurekanotificationServerListUpdater:也可服务于DynamicServerListLoadBalancer负载均衡器,它需要利用Eureka的事件监听器来触发服务列表的更新操作。
五 PollingServerListUpdater解读
public class PollingServerListUpdater implements ServerListUpdater {
private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);
//更新服务实例在初始化之后延迟1秒后开始执行
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
//以30秒为周期重复执行
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
private static class LazyHolder {
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private static Thread _shutdownThread;
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
static {
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder())
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
if (_shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
// this can happen if we're in the middle of a real
// shutdown,
// and that's 'ok'
}
}
}
}
}
private static ScheduledThreadPoolExecutor getRefreshExecutor() {
return LazyHolder._serverListRefreshExecutor;
}
private final AtomicBoolean isActive = new AtomicBoolean(false);
private volatile long lastUpdated = System.currentTimeMillis();
private final long initialDelayMs;
private final long refreshIntervalMs;
private volatile ScheduledFuture<?> scheduledFuture;
public PollingServerListUpdater() {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
public PollingServerListUpdater(IClientConfig clientConfig) {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
}
//以定时任务的方式进行服务列表的更新。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//创建一个Runnable的线程wrapperRunnable
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//具体更新服务实例列表的方法
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//为wrapperRunnable线程启动一个定时任务
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //1秒
refreshIntervalMs, //30秒
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
@Override
public synchronized void stop() {
if (isActive.compareAndSet(true, false)) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
} else {
logger.info("Not active, no-op");
}
}
//记录最后更新时间
@Override
public String getLastUpdate() {
return new Date(lastUpdated).toString();
}
@Override
public long getDurationSinceLastUpdateMs() {
return System.currentTimeMillis() - lastUpdated;
}
@Override
public int getNumberMissedCycles() {
if (!isActive.get()) {
return 0;
}
return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs);
}
@Override
public int getCoreThreads() {
if (isActive.get()) {
if (getRefreshExecutor() != null) {
return getRefreshExecutor().getCorePoolSize();
}
}
return 0;
}
private static long getRefreshIntervalMs(IClientConfig clientConfig) {
return clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
}
上一篇: MVC模式的PHP实现
下一篇: C++ inline的详解
推荐阅读
-
撸一撸Spring Cloud Ribbon的原理-负载均衡策略
-
Ribbon的负载均衡策略及原理
-
springcloud~~配置ribbon的负载均衡策略
-
Ribbon负载均衡策略初步解读
-
撸一撸Spring Cloud Ribbon的原理-负载均衡策略
-
Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerList解读
-
Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerListFilter解读
-
Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerListUpdater解读