欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerList解读

程序员文章站 2022-03-15 22:37:30
...

一 DynamicServerListLoadBalancer在类图中的位置

Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerList解读

二 DynamicServerListLoadBalancer源码解读

1 关键代码请见注释

2 源码位置:ribbon-master\ribbon-loadbalancer\src\main\java\com\netflix\loadbalancer\DynamicServerListLoadBalancer.java

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
    //服务列表操作对象
    //这里泛型T是一个Server的子类,代表了一个具体的服务实例扩展类
    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    protected volatile ServerListUpdater serverListUpdater;

    public DynamicServerListLoadBalancer() {
        super();
    }

    @Deprecated
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
            ServerList<T> serverList, ServerListFilter<T> filter) {
        this(
                clientConfig,
                rule,
                ping,
                serverList,
                filter,
                new PollingServerListUpdater()
        );
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
        initWithNiwsConfig(clientConfig);
    }
    
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        try {
            super.initWithNiwsConfig(clientConfig);
            String niwsServerListClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.NIWSServerListClassName,
                    DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);

            ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                    .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
            this.serverListImpl = niwsServerListImpl;

            if (niwsServerListImpl instanceof AbstractServerList) {
                AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                        .getFilterImpl(clientConfig);
                niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
                this.filter = niwsFilter;
            }

            String serverListUpdaterClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.ServerListUpdaterClassName,
                    DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
            );

            this.serverListUpdater = (ServerListUpdater) ClientFactory
                    .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

            restOfInit(clientConfig);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Exception while initializing NIWSDiscoveryLoadBalancer:"
                            + clientConfig.getClientName()
                            + ", niwsClientConfig:" + clientConfig, e);
        }
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
    
    
    @Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

    public ServerList<T> getServerListImpl() {
        return serverListImpl;
    }

    public void setServerListImpl(ServerList<T> niwsServerList) {
        this.serverListImpl = niwsServerList;
    }

    public ServerListFilter<T> getFilter() {
        return filter;
    }

    public void setFilter(ServerListFilter<T> filter) {
        this.filter = filter;
    }

    public ServerListUpdater getServerListUpdater() {
        return serverListUpdater;
    }

    public void setServerListUpdater(ServerListUpdater serverListUpdater) {
        this.serverListUpdater = serverListUpdater;
    }

    @Override
    public void forceQuickPing() {
        // no-op
    }

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    private String getIdentifier() {
        return this.getClientConfig().getClientName();
    }

    public void stopServerListRefreshing() {
        if (serverListUpdater != null) {
            serverListUpdater.stop();
        }
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    /**
     * Update the AllServer list in the LoadBalancer if necessary and enabled
     *
     * @param ls
     */
    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
        sb.append(super.toString());
        sb.append("ServerList:" + String.valueOf(serverListImpl));
        return sb.toString();
    }
    
    @Override
    public void shutdown() {
        super.shutdown();
        stopServerListRefreshing();
    }


    @Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
    public String getLastUpdate() {
        return serverListUpdater.getLastUpdate();
    }

    @Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
    public long getDurationSinceLastUpdateMs() {
        return serverListUpdater.getDurationSinceLastUpdateMs();
    }

    @Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
    public int getNumberMissedCycles() {
        return serverListUpdater.getNumberMissedCycles();
    }

    @Monitor(name="NumThreads", type=DataSourceType.GAUGE)
    public int getCoreThreads() {
        return serverListUpdater.getCoreThreads();
    }
}

三 ServerList在类图中的位置

Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerList解读

四 ServerList源码解读

public interface ServerList<T extends Server> {
    //用于获取初始化的服务实例清单
    public List<T> getInitialListOfServers();
    //获取更新服务实例清单
    public List<T> getUpdatedListOfServers();   

}

五 EurekaRibbonClientConfiguration类源码解读

//该类实现了DynamicServerListLoadBalancer类中的serverListImpl
@Configuration
public class EurekaRibbonClientConfiguration {

    private static final Log log = LogFactory.getLog(EurekaRibbonClientConfiguration.class);

    @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
    private boolean approximateZoneFromHostname = false;

    @RibbonClientName
    private String serviceId = "client";

    @Autowired(required = false)
    private EurekaClientConfig clientConfig;

    @Autowired(required = false)
    private EurekaInstanceConfig eurekaConfig;

    @Autowired
    private PropertiesFactory propertiesFactory;

    public EurekaRibbonClientConfiguration() {
    }

    public EurekaRibbonClientConfiguration(EurekaClientConfig clientConfig,
            String serviceId, EurekaInstanceConfig eurekaConfig,
            boolean approximateZoneFromHostname) {
        this.clientConfig = clientConfig;
        this.serviceId = serviceId;
        this.eurekaConfig = eurekaConfig;
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        //这里创建了一个DomainExtractingServerList实例,discoveryServerList的类型是DiscoveryEnabledNIWSServerList
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

    @Bean
    public ServerIntrospector serverIntrospector() {
        return new EurekaServerIntrospector();
    }

    @PostConstruct
    public void preprocess() {
        String zone = ConfigurationManager.getDeploymentContext()
                .getValue(ContextKey.zone);
        if (this.clientConfig != null && StringUtils.isEmpty(zone)) {
            if (this.approximateZoneFromHostname && this.eurekaConfig != null) {
                String approxZone = ZoneUtils
                        .extractApproximateZone(this.eurekaConfig.getHostName(false));
                log.debug("Setting Zone To " + approxZone);
                ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
                        approxZone);
            }
            else {
                String availabilityZone = this.eurekaConfig == null ? null
                        : this.eurekaConfig.getMetadataMap().get("zone");
                if (availabilityZone == null) {
                    String[] zones = this.clientConfig
                            .getAvailabilityZones(this.clientConfig.getRegion());
                    // Pick the first one from the regions we want to connect to
                    availabilityZone = zones != null && zones.length > 0 ? zones[0]
                            : null;
                }
                if (availabilityZone != null) {
                    // You can set this with archaius.deployment.* (maybe requires
                    // custom deployment context)?
                    ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
                            availabilityZone);
                }
            }
        }
        RibbonUtils.initializeRibbonDefaults(serviceId);
    }

}

六 DomainExtractingServerList源码解读

//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    //内部定义了ServerList
    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    //构造函数的参数list是DiscoveryEnabledNIWSServerList
    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    //getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
            //下面这个函数获取到最新的服务实例清单    
            .getInitialListOfServers());
        return servers;
    }

    //getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
    
    //将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}

七 DiscoveryEnabledNIWSServerList解读

//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    //内部定义了ServerList
    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    //构造函数的参数list是DiscoveryEnabledNIWSServerList
    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    //getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
            //下面这个函数获取到最新的服务实例清单    
            .getInitialListOfServers());
        return servers;
    }

    //getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
    
    //将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}

八 DomainExtractingServer解读

//该类是DiscoveryEnabledServer子类,该类一般用于DiscoveryEnabledServer到DomainExtractingServer的转换
class DomainExtractingServer extends DiscoveryEnabledServer {

    private String id;

    @Override
    public String getId() {
        return id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    //设置了服务实例的一些必要属性,比如id、zone、isAliveFlag
    public DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
            boolean useIpAddr, boolean approximateZoneFromHostname) {
        // host and port are set in super()
        super(server.getInstanceInfo(), useSecurePort, useIpAddr);
        if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
            setZone(server.getInstanceInfo().getMetadata().get("zone"));
        }
        else if (approximateZoneFromHostname) {
            setZone(ZoneUtils.extractApproximateZone(server.getHost()));
        }
        else {
            setZone(server.getZone());
        }
        setId(extractId(server));
        setAlive(server.isAlive());
        setReadyToServe(server.isReadyToServe());
    }

    private String extractId(Server server) {
        if (server instanceof DiscoveryEnabledServer) {
            DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
            InstanceInfo instance = enabled.getInstanceInfo();
            if (instance.getMetadata().containsKey("instanceId")) {
                return instance.getHostName()+":"+instance.getMetadata().get("instanceId");
            }
        }
        return super.getId();
    }
}

 

相关标签: Ribbon