欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)

程序员文章站 2022-05-31 10:27:43
...
按照94的阅读进度,这里该看如何定位RS和Region了
先回顾下94,原来的做法是遍历操作,然后根据每个操作来定位region,按后加入region的任务队列,没有则创建。定位region的操作由HConnectionManager.HConnectionImplementation.locateRegion方法完成,这里由AsyncProcessor.findDestLocation完成

这里依然是循环,不过循环体有些变化

private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
    if (row == null) throw new IllegalArgumentException("row cannot be null");
    HRegionLocation loc = null;
    IOException locationException = null;
    try {
      loc = hConnection.locateRegion(this.tableName, row.getRow());
      if (loc == null) {
        locationException = new IOException("No location found, aborting submit for" +
            " tableName=" + tableName +
            " rowkey=" + Arrays.toString(row.getRow()));
      }
    } catch (IOException e) {
      locationException = e;
    }
    if (locationException != null) {
      // There are multiple retries in locateRegion already. No need to add new.
      // We can't continue with this row, hence it's the last retry.
      manageError(numAttempt, posInList, row, false, locationException, null);
      return null;
    }

    return loc;
  }


可以看出其实还是调用HConnectionImplementation.locateRegion,但是HConnectionImplementation.locateRegion方法本身的内容又不一样了
private HRegionLocation locateRegion(final TableName tableName,
      final byte [] row, boolean useCache, boolean retry)
    throws IOException {
      if (this.closed) throw new IOException(toString() + " closed");
      if (tableName== null || tableName.getName().length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");
      }

      if (tableName.equals(TableName.META_TABLE_NAME)) {
        return this.registry.getMetaRegionLocation();
      } else {
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
          useCache, userRegionLock, retry);
      }
    }

嗯,ROOT表的特殊处理流程已经不见了,这里有两个值得注意的地方,ROOT表已经改名为hbase:namespace,META则是hbase:meta,META表的处理流程也发生了变化,唯一不变的是user表
先看看变化的部分:调用了Registry的一个方法,具体由ZooKeeperRegistry实现,我很高兴的告诉亲,both of 他们都是新增的,94版本中没有。

public HRegionLocation getMetaRegionLocation() throws IOException {
    ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
      }
      ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout);
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looked up meta region location, connection=" + this +
          "; serverName=" + ((servername == null) ? "null" : servername));
      }
      if (servername == null) return null;
      return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    } finally {
      zkw.close();
    }
  }

  public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
      final long timeout)
  throws InterruptedException {
    byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.metaServerZNode, timeout);
    if (data == null) return null;
    try {
      return ServerName.parseFrom(data);
    } catch (DeserializationException e) {
      LOG.warn("Failed parse", e);
      return null;
    }
  }


我勒个去。。。这也太简单了。。。这是不需要ROOT表了么,直接从ZK取?说好的zk存root表的region信息,root表存meta表的region信息的,怎么能够这么改,节操呢。。。不过这样看的话,少了一层IO明显变得NB了

不过locateRegionInMeta明显也瘦身了。。。嗯。。先贴出来,明天接着看
private HRegionLocation locateRegionInMeta(final TableName parentTable,
      final TableName tableName, final byte [] row, boolean useCache,
      Object regionLockObject, boolean retry)
    throws IOException {
      HRegionLocation location;
      // If we are supposed to be using the cache, look in the cache to see if
      // we already have the region.
      // 上来还是首先从缓存里面读取,方法接口没有变。备注【1】
      if (useCache) {
        location = getCachedLocation(tableName, row);
        if (location != null) {
          return location;
        }
      }
      int localNumRetries = retry ? numTries : 1;
      // build the key of the meta region we should be looking for.
      // the extra 9's on the end are necessary to allow "exact" matches
      // without knowing the precise region names.
      // 还是定义region信息查询键
      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
        HConstants.NINES, false);
      for (int tries = 0; true; tries++) {
        if (tries >= localNumRetries) {
          throw new NoServerForRegionException("Unable to find region for "
            + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
        }

        HRegionLocation metaLocation = null;
        try {
          // locate the meta region
          // 还是先查询父表。不过如上所示,父表查询接口有了很大的变化,已经没有root表的查询了,直接从ZK获取meta表信息
          metaLocation = locateRegion(parentTable, metaKey, true, false);
          // If null still, go around again.
          if (metaLocation == null) continue;
          // RPC的调用方有了些变化,原来是使用HRgionInterface实例进行查询,实际上由委托代理,调用HBaseclient方法实现。这里先记下来备注【2】
          ClientService.BlockingInterface service = getClient(metaLocation.getServerName());

          Result regionInfoRow;
          // This block guards against two threads trying to load the meta
          // region at the same time. The first will load the meta region and
          // the second will use the value that the first one found.
          synchronized (regionLockObject) {
            // Check the cache again for a hit in case some other thread made the
            // same query while we were waiting on the lock.
            // 和之前一样,类似于读写双锁检查,进入写模块之后,需要再次确认缓存,避免重复请求
            if (useCache) {
              location = getCachedLocation(tableName, row);
              if (location != null) {
                return location;
              }
              // If the parent table is META, we may want to pre-fetch some
              // region info into the global region cache for this table.
              if (parentTable.equals(TableName.META_TABLE_NAME)
                  && (getRegionCachePrefetch(tableName))) {
                prefetchRegionCache(tableName, row);//和之前一样,父表为meta表则进行预抓取。这里新增了一个TableName的类型。直接使用该类型的比较方法,去掉了之前的直接byte比较。备注【3】
              }
              location = getCachedLocation(tableName, row);//还是一样,第三次确认
              if (location != null) {
                return location;
              }
            } else {
              // If we are not supposed to be using the cache, delete any existing cached location
              // so it won't interfere.
              forceDeleteCachedLocation(tableName, row);//这里只是方法名变了,不过内部实现,内部的内部实现跟之前完全是一模一样的。
            }
            // Query the meta region for the location of the meta region
            // 查询Region meta,有变化。
            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
              metaLocation.getRegionInfo().getRegionName(), metaKey,
              HConstants.CATALOG_FAMILY);//备注【4】,这属于读操作,后续再看
          }
          if (regionInfoRow == null) {
            throw new TableNotFoundException(tableName);
          }

          // convert the row result into the HRegionLocation we need!
          // 将查询结果转化为一个HRegionInfo,这里的调用有些改变,之前是先获取value,然后委托Writables.getWritable(  
                    value, new HRegionInfo()),这里语义更明显。备注【5】,这里委托了protobuf,只能先放放了
          HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
          // 跟之前的判断一样,检查regionInfo是否为空,是否属于当前表,是否正在split或者是否已经下线。多了一个down机检查。
          if (regionInfo == null) {
            throw new IOException("HRegionInfo was null or empty in " +
              parentTable + ", row=" + regionInfoRow);
          }

          // possible we got a region of a different table...
          if (!regionInfo.getTable().equals(tableName)) {
            throw new TableNotFoundException(
                  "Table '" + tableName + "' was not found, got: " +
                  regionInfo.getTable() + ".");
          }
          if (regionInfo.isSplit()) {
            throw new RegionOfflineException("the only available region for" +
              " the required row is a split parent," +
              " the daughters should be online soon: " +
              regionInfo.getRegionNameAsString());
          }
          if (regionInfo.isOffline()) {
            throw new RegionOfflineException("the region is offline, could" +
              " be caused by a disable table call: " +
              regionInfo.getRegionNameAsString());
          }
      
          ServerName serverName = HRegionInfo.getServerName(regionInfoRow);//这里将原有的byte[]信息转化为对象,代码可读性更好了,94中这里很多byte[]解析。
          if (serverName == null) {
            throw new NoServerForRegionException("No server address listed " +
              "in " + parentTable + " for region " +
              regionInfo.getRegionNameAsString() + " containing row " +
              Bytes.toStringBinary(row));
          }

          if (isDeadServer(serverName)){
            throw new RegionServerStoppedException("hbase:meta says the region "+
                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                ", but it is dead.");
          }

          // Instantiate the location
          // HRegionLocation的构造稍微变复杂了,其实也就是封装了下,唯一多出来的就是一个seqNum,不知道具体作用还  
          location = new HRegionLocation(regionInfo, serverName,
            HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
          cacheLocation(tableName, null, location);//缓存,备注【6】
          return location;
        } catch (TableNotFoundException e) {
          // if we got this error, probably means the table just plain doesn't
          // exist. rethrow the error immediately. this should always be coming
          // from the HTable constructor.
          throw e;
        } catch (IOException e) {
          if (e instanceof RemoteException) {
            e = ((RemoteException)e).unwrapRemoteException();
          }
          if (tries < numTries - 1) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("locateRegionInMeta parentTable=" +
                parentTable + ", metaLocation=" +
                ((metaLocation == null)? "null": "{" + metaLocation + "}") +
                ", attempt=" + tries + " of " +
                this.numTries + " failed; retrying after sleep of " +
                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
            }
          } else {
            throw e;
          }
          // Only relocate the parent region if necessary
          if(!(e instanceof RegionOfflineException ||
              e instanceof NoServerForRegionException)) {
            relocateRegion(parentTable, metaKey);
          }
        }
        try{
          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Giving up trying to location region in " +
            "meta: thread is interrupted.");
        }
      }
    }

    

总的来看,流程和思路和94是完全一模一样的,只是具体的实现有了变化。

备注【1】【6】:HRegionLocation缓存和读取缓存,其中【1】读取缓存完全没有任何变化,但是【6】缓存HRegionlocation变化很大。94版的流程很简单,将region对应的服务器信息拿出来并缓存,然后将当前region加入到table的region缓存中。

96版的首先是获取table的所有region信息有了变化,以前是按照byte[]型tableName的hash值当键做的缓存索引,这里使用了TableName类型直接作为键,执行效率会有细微的降低,但是逻辑就更清楚了。(1)
然后serverName缓存也有了细微的变换,以前是缓存一个串,现在是缓存了一个ServerName类型。(2)

最后多了一个source参数,也是HRegionLocation类型,非空表示不是直接从meta表中查询到的。一般来说为空。(3)

private void cacheLocation(final TableName tableName, final HRegionLocation source,
        final HRegionLocation location) {
      boolean isFromMeta = (source == null);
      byte [] startKey = location.getRegionInfo().getStartKey();
      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);//(1)
      boolean isNewCacheEntry = false;
      boolean isStaleUpdate = false;
      HRegionLocation oldLocation = null;
      synchronized (this.cachedRegionLocations) {
        cachedServers.add(location.getServerName());//(2)
        oldLocation = tableLocations.get(startKey);// 还是用startKey做索引
        isNewCacheEntry = (oldLocation == null);//如果能查询出来,则表示更新
        // If the server in cache sends us a redirect, assume it's always valid.
        if (!isNewCacheEntry && !oldLocation.equals(source)) {
        //如果是更新,而且确实需要更新,这个equals方法只是判断了ServerName对象是否相等。代码就不贴了
          long newLocationSeqNum = location.getSeqNum();

          // 这里解释了两个判断是否为陈旧信息的判断标准,有可能服务器自己关闭了老的region,当我们请求的时候,告诉我们一个新的region信息,类似于我们用一个url信息请求一个网页,但是收到一个302
          // 或者服务器关闭了一个region,但是用相同的序列号又打开了一个新的。之前HRegionlocation的构造函数多了一个seqNum在这里就用上了,看来96版的region信息管理跟之前的版本相比较有较大的变化 
          // Meta record is stale - some (probably the same) server has closed the region
          // with later seqNum and told us about the new location.
          boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
          // Same as above for redirect. However, in this case, if the number is equal to previous
          // record, the most common case is that first the region was closed with seqNum, and then
          // opened with the same seqNum; hence we will ignore the redirect.
          // There are so many corner cases with various combinations of opens and closes that
          // an additional counter on top of seqNum would be necessary to handle them all.
          boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
          isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
        }
        if (!isStaleUpdate) {
          tableLocations.put(startKey, location);
        }
      }
      if (isNewCacheEntry) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Cached location for " +
            location.getRegionInfo().getRegionNameAsString() +
            " is " + location.getHostnamePort());
        }
      } else if (isStaleUpdate && !location.equals(oldLocation)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Ignoring stale location update for "
            + location.getRegionInfo().getRegionNameAsString() + ": "
            + location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
            + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
        }
      }
    }


搞定,主要是对是否需要缓存当前拿到的region信息多了很多判断,看来服务端的代码也有很多不同的坑要填了。

然后看备注【2】的代码,获取RPC调用连接。之前是是一个HRegionInterface的实例,这里变成了ClientService.BlockingInterface实例。不过创建方式差不多的呢。

温故知新,先回顾下94版本的实现:
首先需要一个RPCEngine,默认为WritableRpcEngine,一个RPCInterface,默认是HRegionInterface。然后创建一个Invocation托管的实例,这个实例再委托HBaseclient通过传统的Socket完成RPC。其中RPCEngine完成对参数和方法的序列化--反序列化。

这里从表象上看,原有的方式被完全抛弃了,首先不再需要RPCEngine,而是使用了一个RpcClient,这是一个全新的类。RPC也不再直接使用传统的socket,而是使用了google的protobuf组件。表示不熟悉这个组件,暂时先把代码码出来吧。

public ClientService.BlockingInterface getClient(final ServerName sn)
    throws IOException {
      if (isDeadServer(sn)) {
        throw new RegionServerStoppedException(sn + " is dead.");
      }
      String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
      // 这个key简单,这俩玩意中间加个@连起来
      this.connectionLock.putIfAbsent(key, key);
      ClientService.BlockingInterface stub = null;
      synchronized (this.connectionLock.get(key)) {
        stub = (ClientService.BlockingInterface)this.stubs.get(key);
        if (stub == null) {
          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
            user, this.rpcTimeout);
          stub = ClientService.newBlockingStub(channel);
          // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
          // Just fail on first actual call rather than in here on setup.
          this.stubs.put(key, stub);
        }
      }
      return stub;
    }


备注【2】只能这么多了,protobuf这个框架确实不熟悉。看看备注备注【3】吧,预抓取

private void prefetchRegionCache(final TableName tableName,
        final byte[] row) {
      // Implement a new visitor for MetaScanner, and use it to walk through
      // the hbase:meta
      // 还是用MetaScannerVisitorBase来遍历meta信息,跟之前一样,用MetaScanner来简化了之前基于Writable的HRegionInfo解析
      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
        public boolean processRow(Result result) throws IOException {
          try {
            HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
            if (regionInfo == null) {
              return true;
            }

            // possible we got a region of a different table...
            if (!regionInfo.getTable().equals(tableName)) {
              return false; // stop scanning
            }
            if (regionInfo.isOffline()) {
              // don't cache offline regions
              return true;
            }

            ServerName serverName = HRegionInfo.getServerName(result);
            if (serverName == null) {
              return true; // don't cache it
            }
            // instantiate the location
            long seqNum = HRegionInfo.getSeqNumDuringOpen(result);//知道HRegionlocation的seqNum哪儿来的了。MS META表加料了哦。info:seqnumDuringOpen,没有则为-1
            HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum);
            // cache this meta entry
            cacheLocation(tableName, null, loc);
            return true;
          } catch (RuntimeException e) {
            throw new IOException(e);
          }
        }
      };
      try {
        // pre-fetch certain number of regions info at region cache.
        MetaScanner.metaScan(conf, this, visitor, tableName, row,
            this.prefetchRegionLimit, TableName.META_TABLE_NAME);
      } catch (IOException e) {
        LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
      }
    }

    public static ServerName getServerName(final Result r) {
    byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
      HConstants.SERVER_QUALIFIER);
    if (value == null || value.length == 0) return null;
    String hostAndPort = Bytes.toString(value);
    value = r.getValue(HConstants.CATALOG_FAMILY,
      HConstants.STARTCODE_QUALIFIER);//还记得94版本中的那个坑么,最后取出来这个cell的值但是没有使用,没想到在这里用上了,这是下一盘很大的棋么
    if (value == null || value.length == 0) return null;
    return new ServerName(hostAndPort, Bytes.toLong(value));
  }

MetaScanner.metaScan跟之前的版本几乎完全一样,不贴了