solrCloud中的分布式请求响应超时解决方案
之前一篇博客中写道solrCloud对查询的请求是在服务端进行的组装,是对所有的shard的所有的replica进行的轮训的。这两天看了下在服务端solr是如何进行操作的,这里涉及到的代码超级多,我就只贴一部分,用来说明大意即可。
在将查询请求发往到某个replica之后,先根据path找到某个requestHandler(我们这里用select举例),然后再用这个requestHandler中所有的searchComponent进行查询操作,他的分布式的操作就是体现在多个searchComponent中,每一个searchComponent不只是要完成它所存在的shard中的工作,还有其他shard中的工作,想当然这里不会使用同步,也就是不会在当前的shard的任务完成之后才会将请求转发到其他的shard,最好是采取异步执行的方式,将某个任务交给线程池,然后继续执行自己的任务,在执行完成后再处理其他的shard返回的数据。solr正是采取了后者——使用一个shardHandler用来转发请求到其他的shard,然后异步的等待其他的shard的执行结果。在solrCloud中使用的shardHandler的实现类是HttpShardHandler,顾名思义,他是采用http的协议与其他的shard进行交互的,其他shard的操作结果返回到当前的shard中,然后再组装最后的结果。在solrhome下我们可以发现有个solr.xml,里面就有关于HttpShardHandler的配置,
<shardHandlerFactory name="shardHandlerFactory" class="HttpShardHandlerFactory"> <!--用于产生一个HttpShardHandler--> <int name="socketTimeout">${socketTimeout:600000}</int> <!--httpClient的socketTimeout--> <int name="connTimeout">${connTimeout:60000}</int> <!--httpClient的connectionTimeout--> </shardHandlerFactory>
HttpShardHandler发起http请求使用的是apache的httpClient,上面的两个配置就是配置的httpClient的两个超时时间(httpCLient有三个超时时间,详情参看我的另一个博客)。想到这就会有很多的疑问,如果访问的时候某个shard死掉了呢(zk中的session还没有过期的情况),又或者他没有死掉但是他的操作非常慢一直到超过上面配置的socketTimeout呢,这种情况下怎么操作?但凡遇到这种情况,看源码是最好的办法,在httpShardHandler中,有个submit方法,他就是某个searchComponent添加任务到httpShardHandler的线程池中,我们看一下这个方法:
/** * 第一个参数表示要发起的请求 * 第二个参数表示要发送到的shard的所有的replica的url,用|分隔 * 第三个参数表示请求的参数 */ @Override public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { // do this outside of the callable for thread safety reasons final List<String> urls = getURLs(sreq, shard);//获得本次访问的所有的url,一个shard有多个replica Callable<ShardResponse> task = new Callable<ShardResponse>() {//将请求封装为一个可以异步执行的callable,最后返回的是一个ShardResponse @Override public ShardResponse call() throws Exception { ShardResponse srsp = new ShardResponse(); if (sreq.nodeName != null) { srsp.setNodeName(sreq.nodeName); } srsp.setShardRequest(sreq); srsp.setShard(shard); SimpleSolrResponse ssr = new SimpleSolrResponse(); srsp.setSolrResponse(ssr); long startTime = System.nanoTime(); try { params.remove(CommonParams.WT); // use default (currently javabin) params.remove(CommonParams.VERSION); QueryRequest req = makeQueryRequest(sreq, params, shard); req.setMethod(SolrRequest.METHOD.POST); // no need to set the response parser as binary is the default // req.setResponseParser(new BinaryResponseParser()); // if there are no shards available for a slice, urls.size()==0 if (urls.size() == 0) { // TODO: what's the right error code here? We should use the same thing when // all of the servers for a shard are down. throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); } if (urls.size() <= 1) { String url = urls.get(0); srsp.setShardAddress(url); try (SolrClient client = new HttpSolrClient(url, httpClient)) {//如果只有一个url,也就是只有一个replica,则直接用这个url发起http请求 ssr.nl = client.request(req); } } else { LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);//如果有多个replica,则会进行负载均衡。 ssr.nl = rsp.getResponse(); srsp.setShardAddress(rsp.getServer()); } } catch (ConnectException cex) { srsp.setException(cex); } catch (Exception th) { // 从这两个catch可以发现,如果在执行的时候发生了任何的异常都会将异常封装到srsp也就是最后的结果中,而不会抛出异常。 srsp.setException(th); if (th instanceof SolrException) { srsp.setResponseCode(((SolrException) th).code()); } else { srsp.setResponseCode(-1); } } ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); return transfomResponse(sreq, srsp, shard); } }; try { if (shard != null) { MDC.put("ShardRequest.shards", shard); } if (urls != null && !urls.isEmpty()) { MDC.put("ShardRequest.urlList", urls.toString()); } pending.add(completionService.submit(task));//将封装的任务添提交到completionService,由其他线程执行这个任务,等待执行结果,然后将最后的结果放到一个集合中(pending就是一个泛型是Future的集合) } finally { MDC.remove("ShardRequest.shards"); MDC.remove("ShardRequest.urlList"); } }
看完上面的代码就知道了原来果然是采用的异步执行,并且在执行过程中不会抛出任何的错误,如果有错误的也会封装在结果中。然后我们再看一下取结果的时候的操作,下面的代码摘抄于org.apache.solr.handler.component.SearchHandler.handleRequestBody(SolrQueryRequest, SolrQueryResponse)这个方法,
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); //从请求中得到shards.tolerant参数,默认是false ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors():shardHandler1.takeCompletedOrError();//得到线程池执行的结果 if (srsp == null) break; // no more requests to wait for // Was there an exception? if (srsp.getException() != null) { //如果有异常 // If things are not tolerant, abort everything and rethrow if(!tolerant) {//如果没有在参数中写shards.tolerant=true,则报错 shardHandler1.cancelAll();//取消所有的操作, if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } else { if(rsp.getResponseHeader().get("partialResults") == null) {//如果是容错的,也就是shards.tolerant=true,则不报错,允许部分成功,然后再响应头中添加一个值partialResults=true,表示这词的请求是部分成功。 rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } } }
现在知道了如果在请求的时候害怕因为某个shard响应太慢而耽误太多的时间,则可以将httpShardHandler的两个timeout配置的小一点,然后再请求中设置shards.tolerant=true,这样就可以了。我测试的java代码(我这次使用的是solr5.5.3):
static CloudSolrClient getServer(){ CloudSolrClient server = new CloudSolrClient("10.90.26.115:2181/solr5"); server.setZkConnectTimeout(10000*3); return server; } static void queryTest() throws SolrServerException, IOException{ CloudSolrClient server = getServer(); SolrQuery query = new SolrQuery("id:?6");//我搜一下id是两位数,并且是以6结尾的。 query.set("shards.tolerant", true);//设置允许出错 QueryResponse response = server.query("你的集合的名字", query);// System.out.println(response.getResults().getNumFound()); System.out.println(response.getResponseHeader().get("partialResults")); }
执行上面的代码,分三个阶段,
第一个阶段是将所有的shard都存活,可以发现打印的partialResults是null,
第二个是将某一个shard停掉,设置不容错,即shards.tolerant=false,结果是报异常,提示某个shard没有节点处理。
第三个是维持某一个shard停掉,设置shards.tolerant=true 可以发现不报错了,但是numFound变少了,而且打印的是true。
至此,已经掌握容错请求的实现。在实际生产中可以根据响应头的partialResults来记录日志,而不影响前台的展示。