欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

solrCloud中的分布式请求响应超时解决方案

程序员文章站 2022-07-13 09:56:38
...

    之前一篇博客中写道solrCloud对查询的请求是在服务端进行的组装,是对所有的shard的所有的replica进行的轮训的。这两天看了下在服务端solr是如何进行操作的,这里涉及到的代码超级多,我就只贴一部分,用来说明大意即可。

    在将查询请求发往到某个replica之后,先根据path找到某个requestHandler(我们这里用select举例),然后再用这个requestHandler中所有的searchComponent进行查询操作,他的分布式的操作就是体现在多个searchComponent中,每一个searchComponent不只是要完成它所存在的shard中的工作,还有其他shard中的工作,想当然这里不会使用同步,也就是不会在当前的shard的任务完成之后才会将请求转发到其他的shard,最好是采取异步执行的方式,将某个任务交给线程池,然后继续执行自己的任务,在执行完成后再处理其他的shard返回的数据。solr正是采取了后者——使用一个shardHandler用来转发请求到其他的shard,然后异步的等待其他的shard的执行结果。在solrCloud中使用的shardHandler的实现类是HttpShardHandler,顾名思义,他是采用http的协议与其他的shard进行交互的,其他shard的操作结果返回到当前的shard中,然后再组装最后的结果。在solrhome下我们可以发现有个solr.xml,里面就有关于HttpShardHandler的配置,

  <shardHandlerFactory name="shardHandlerFactory"   class="HttpShardHandlerFactory"> <!--用于产生一个HttpShardHandler-->
    <int name="socketTimeout">${socketTimeout:600000}</int> <!--httpClient的socketTimeout-->  
    <int name="connTimeout">${connTimeout:60000}</int>  <!--httpClient的connectionTimeout-->
  </shardHandlerFactory>

HttpShardHandler发起http请求使用的是apache的httpClient,上面的两个配置就是配置的httpClient的两个超时时间(httpCLient有三个超时时间,详情参看我的另一个博客)。想到这就会有很多的疑问,如果访问的时候某个shard死掉了呢(zk中的session还没有过期的情况),又或者他没有死掉但是他的操作非常慢一直到超过上面配置的socketTimeout呢,这种情况下怎么操作?但凡遇到这种情况,看源码是最好的办法,在httpShardHandler中,有个submit方法,他就是某个searchComponent添加任务到httpShardHandler的线程池中,我们看一下这个方法:

/**
 * 第一个参数表示要发起的请求
 * 第二个参数表示要发送到的shard的所有的replica的url,用|分隔
 * 第三个参数表示请求的参数
 */
@Override
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
    // do this outside of the callable for thread safety reasons
    final List<String> urls = getURLs(sreq, shard);//获得本次访问的所有的url,一个shard有多个replica
    
    Callable<ShardResponse> task = new Callable<ShardResponse>() {//将请求封装为一个可以异步执行的callable,最后返回的是一个ShardResponse
      
      @Override
      public ShardResponse call() throws Exception {
        
        ShardResponse srsp = new ShardResponse();
        if (sreq.nodeName != null) {
          srsp.setNodeName(sreq.nodeName);
        }
        srsp.setShardRequest(sreq);
        srsp.setShard(shard);
        SimpleSolrResponse ssr = new SimpleSolrResponse();
        srsp.setSolrResponse(ssr);
        long startTime = System.nanoTime();
        
        try {
          params.remove(CommonParams.WT); // use default (currently javabin)
          params.remove(CommonParams.VERSION);
          
          QueryRequest req = makeQueryRequest(sreq, params, shard);
          req.setMethod(SolrRequest.METHOD.POST);
          
          // no need to set the response parser as binary is the default
          // req.setResponseParser(new BinaryResponseParser());
          
          // if there are no shards available for a slice, urls.size()==0
          if (urls.size() == 0) {
            // TODO: what's the right error code here? We should use the same thing when
            // all of the servers for a shard are down.
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
          }
          
          if (urls.size() <= 1) {
            String url = urls.get(0);
            srsp.setShardAddress(url);
            try (SolrClient client = new HttpSolrClient(url, httpClient)) {//如果只有一个url,也就是只有一个replica,则直接用这个url发起http请求
              ssr.nl = client.request(req);
            }
          } else {
            LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);//如果有多个replica,则会进行负载均衡。
            ssr.nl = rsp.getResponse();
            srsp.setShardAddress(rsp.getServer());
          }
        } catch (ConnectException cex) { 
          srsp.setException(cex);
        } catch (Exception th) { // 从这两个catch可以发现,如果在执行的时候发生了任何的异常都会将异常封装到srsp也就是最后的结果中,而不会抛出异常。
          srsp.setException(th);
          if (th instanceof SolrException) {
            srsp.setResponseCode(((SolrException) th).code());
          } else {
            srsp.setResponseCode(-1);
          }
        }
        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
        return transfomResponse(sreq, srsp, shard);
      }
    };
    
    try {
      if (shard != null) {
        MDC.put("ShardRequest.shards", shard);
      }
      if (urls != null && !urls.isEmpty()) {
        MDC.put("ShardRequest.urlList", urls.toString());
      }
      pending.add(completionService.submit(task));//将封装的任务添提交到completionService,由其他线程执行这个任务,等待执行结果,然后将最后的结果放到一个集合中(pending就是一个泛型是Future的集合)
    } finally {
      MDC.remove("ShardRequest.shards");
      MDC.remove("ShardRequest.urlList");
    }
  }

 

看完上面的代码就知道了原来果然是采用的异步执行,并且在执行过程中不会抛出任何的错误,如果有错误的也会封装在结果中。然后我们再看一下取结果的时候的操作,下面的代码摘抄于org.apache.solr.handler.component.SearchHandler.handleRequestBody(SolrQueryRequest, SolrQueryResponse)这个方法,

boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); //从请求中得到shards.tolerant参数,默认是false
ShardResponse srsp = tolerant ?  shardHandler1.takeCompletedIncludingErrors():shardHandler1.takeCompletedOrError();//得到线程池执行的结果
 if (srsp == null) break;  // no more requests to wait for

// Was there an exception?  
if (srsp.getException() != null) { //如果有异常
     // If things are not tolerant, abort everything and rethrow
     if(!tolerant) {//如果没有在参数中写shards.tolerant=true,则报错
          shardHandler1.cancelAll();//取消所有的操作,
          if (srsp.getException() instanceof SolrException) {
              throw (SolrException)srsp.getException();
          } else {
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
          }
     } else {
          if(rsp.getResponseHeader().get("partialResults") == null) {//如果是容错的,也就是shards.tolerant=true,则不报错,允许部分成功,然后再响应头中添加一个值partialResults=true,表示这词的请求是部分成功。
               rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
          }
     }
}

 

现在知道了如果在请求的时候害怕因为某个shard响应太慢而耽误太多的时间,则可以将httpShardHandler的两个timeout配置的小一点,然后再请求中设置shards.tolerant=true,这样就可以了。我测试的java代码(我这次使用的是solr5.5.3):

static CloudSolrClient getServer(){
	CloudSolrClient server = new CloudSolrClient("10.90.26.115:2181/solr5");
	server.setZkConnectTimeout(10000*3);
	return server;
}

static void queryTest() throws SolrServerException, IOException{
        CloudSolrClient server = getServer();
	SolrQuery query = new SolrQuery("id:?6");//我搜一下id是两位数,并且是以6结尾的。
	query.set("shards.tolerant", true);//设置允许出错
	QueryResponse response = server.query("你的集合的名字", query);//
	System.out.println(response.getResults().getNumFound());
	System.out.println(response.getResponseHeader().get("partialResults"));		
}

 

执行上面的代码,分三个阶段,

第一个阶段是将所有的shard都存活,可以发现打印的partialResults是null,

第二个是将某一个shard停掉,设置不容错,即shards.tolerant=false,结果是报异常,提示某个shard没有节点处理。

第三个是维持某一个shard停掉,设置shards.tolerant=true 可以发现不报错了,但是numFound变少了,而且打印的是true。

 

至此,已经掌握容错请求的实现。在实际生产中可以根据响应头的partialResults来记录日志,而不影响前台的展示。