Spring Cloud 请求重试机制核心代码分析
场景
发布微服务的操作一般都是打完新代码的包,kill掉在跑的应用,替换新的包,启动。
spring cloud 中使用eureka为注册中心,它是允许服务列表数据的延迟性的,就是说即使应用已经不在服务列表了,客户端在一段时间内依然会请求这个地址。那么就会出现请求正在发布的地址,而导致失败。
我们会优化服务列表的刷新时间,以提高服务列表信息的时效性。但是无论怎样,都无法避免有那么一段时间是数据不一致的。
所以我们想到一个办法就是重试机制,当a机子在重启时,同个集群的b是可以正常提供服务的,如果有重试机制就可以在上面这个场景里进行重试到b而不影响正确响应。
操作
需要进行如下的操作:
ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 OkToRetryOnAllOperations: false
引入spring-retry包
<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency>
以zuul为例子还需要配置开启重试:
zuul.retryable=true
遇到了问题
然而万事总没那么一帆风顺,通过测试重试机制生效了,但是并没有我想象的去请求另一台健康的机子,于是*去吧开源码看一看,最终发现是源码的bug,不过已经修复,升级版本即可。
代码分析
使用的版本是
spring-cloud-netflix-core:1.3.6.RELEASE
spring-retry:1.2.1.RELEASE
spring cloud 依赖版本:
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
因为启用了重试,所以请求应用时会执行RetryableRibbonLoadBalancingHttpClient.execute方法:
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this); RetryCallback retryCallback = new RetryCallback() { @Override public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception { //on retries the policy will choose the server and set it in the context //extract the server and update the request being made RibbonApacheHttpRequest newRequest = request; if(context instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance(); if(service != null) { //Reconstruct the request URI using the host and port set in the retry context newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(), newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(), newRequest.getURI().getPath(), newRequest.getURI().getQuery(), newRequest.getURI().getFragment())); } } newRequest = getSecureRequest(request, configOverride); HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig); final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest); if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { if(CloseableHttpResponse.class.isInstance(httpResponse)) { ((CloseableHttpResponse)httpResponse).close(); } throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName, httpResponse.getStatusLine().getStatusCode()); } return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } }; return this.executeWithRetry(request, retryPolicy, retryCallback); }
我们发现先new 一个RetryCallback,然后执行this.executeWithRetry(request, retryPolicy, retryCallback);
而这个RetryCallback.doWithRetry的代码我们清楚看到是实际请求的代码,也就是说this.executeWithRetry方法最终还是会调用RetryCallback.doWithRetry
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException { RetryPolicy retryPolicy = this.retryPolicy; BackOffPolicy backOffPolicy = this.backOffPolicy; // Allow the retry policy to initialise itself... RetryContext context = open(retryPolicy, state); if (this.logger.isTraceEnabled()) { this.logger.trace("RetryContext retrieved: " + context); } // Make sure the context is available globally for clients who need // it... RetrySynchronizationManager.register(context); Throwable lastException = null; boolean exhausted = false; try { // Give clients a chance to enhance the context... boolean running = doOpenInterceptors(retryCallback, context); if (!running) { throw new TerminatedRetryException( "Retry terminated abnormally by interceptor before first attempt"); } // Get or Start the backoff context... BackOffContext backOffContext = null; Object resource = context.getAttribute("backOffContext"); if (resource instanceof BackOffContext) { backOffContext = (BackOffContext) resource; } if (backOffContext == null) { backOffContext = backOffPolicy.start(context); if (backOffContext != null) { context.setAttribute("backOffContext", backOffContext); } } /* * We allow the whole loop to be skipped if the policy or context already * forbid the first try. This is used in the case of external retry to allow a * recovery in handleRetryExhausted without the callback processing (which * would throw an exception). */ while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { if (this.logger.isDebugEnabled()) { this.logger.debug("Retry: count=" + context.getRetryCount()); } // Reset the last exception, so if we are successful // the close interceptors will not think we failed... lastException = null; return retryCallback.doWithRetry(context); } catch (Throwable e) { lastException = e; try { registerThrowable(retryPolicy, state, context, e); } catch (Exception ex) { throw new TerminatedRetryException("Could not register throwable", ex); } finally { doOnErrorInterceptors(retryCallback, context, e); } if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { backOffPolicy.backOff(backOffContext); } catch (BackOffInterruptedException ex) { lastException = e; // back off was prevented by another thread - fail the retry if (this.logger.isDebugEnabled()) { this.logger .debug("Abort retry because interrupted: count=" + context.getRetryCount()); } throw ex; } } if (this.logger.isDebugEnabled()) { this.logger.debug( "Checking for rethrow: count=" + context.getRetryCount()); } if (shouldRethrow(retryPolicy, context, state)) { if (this.logger.isDebugEnabled()) { this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); } throw RetryTemplate.<E>wrapIfNecessary(e); } } /* * A stateful attempt that can retry may rethrow the exception before now, * but if we get this far in a stateful retry there's a reason for it, * like a circuit breaker or a rollback classifier. */ if (state != null && context.hasAttribute(GLOBAL_STATE)) { break; } } if (state == null && this.logger.isDebugEnabled()) { this.logger.debug( "Retry failed last attempt: count=" + context.getRetryCount()); } exhausted = true; return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { close(retryPolicy, context, state, lastException == null || exhausted); doCloseInterceptors(retryCallback, context, lastException); RetrySynchronizationManager.clear(); } }
在一个while循环里实现重试机制,当执行retryCallback.doWithRetry(context)出现异常的时候,就会catch异常,然后用 retryPolicy判断是否进行重试,特别注意registerThrowable(retryPolicy, state, context, e);方法,不但判断了是否重试,在重试情况下会新选出一个机子放入context,然后再去执行retryCallback.doWithRetry(context)时带入,如此就实现了换机子重试了。
但是我的配置怎么会没有换机子呢?调试代码发现registerThrowable(retryPolicy, state, context, e);选出来的机子没问题,就是新的健康的机子,但是在执行retryCallback.doWithRetry(context)代码的时候依然请求的是那台挂掉的机子。
所以我们再仔细看一下retryCallback.doWithRetry(context)的代码:
我们发现了这行代码:
newRequest = getSecureRequest(request, configOverride); protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) { if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme("https").build(true).toUri(); return request.withNewUri(secureUri); } return request; }
newRequest在前面已经使用context构建完毕,request是上一次请求的数据,只要执行这个代码就会发现newRequest永远都会被request覆盖。看到这里我们才发现原来是一个源码bug。
issue地址:
https://github.com/spring-cloud/spring-cloud-netflix/issues/2667
总结
这是一次很普通的查问题过程,在这个过程中当我发现配置没有达到我的预期时,我先查看了配置的含义,尝试多次无果,于是进行断点调试发现异常中断点后,因为场景需要一台机子健康一台机子下线,我模拟了数百次,最终才定位到了这行代码。开源项目即使是优秀的项目必然也会有bug存在,不迷信,不盲目。另一方面,阅读源码能力也是一个解决问题的重要能力,像我在找源码入口,定位代码时耗费了很多的时间。
上一篇: 详解实现hyper-v虚拟机克隆的方法
下一篇: python抓取京东小米8手机配置信息