HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)
程序员文章站
2022-05-31 10:27:43
...
按照94的阅读进度,这里该看如何定位RS和Region了
先回顾下94,原来的做法是遍历操作,然后根据每个操作来定位region,按后加入region的任务队列,没有则创建。定位region的操作由HConnectionManager.HConnectionImplementation.locateRegion方法完成,这里由AsyncProcessor.findDestLocation完成
这里依然是循环,不过循环体有些变化
可以看出其实还是调用HConnectionImplementation.locateRegion,但是HConnectionImplementation.locateRegion方法本身的内容又不一样了
嗯,ROOT表的特殊处理流程已经不见了,这里有两个值得注意的地方,ROOT表已经改名为hbase:namespace,META则是hbase:meta,META表的处理流程也发生了变化,唯一不变的是user表
先看看变化的部分:调用了Registry的一个方法,具体由ZooKeeperRegistry实现,我很高兴的告诉亲,both of 他们都是新增的,94版本中没有。
我勒个去。。。这也太简单了。。。这是不需要ROOT表了么,直接从ZK取?说好的zk存root表的region信息,root表存meta表的region信息的,怎么能够这么改,节操呢。。。不过这样看的话,少了一层IO明显变得NB了
不过locateRegionInMeta明显也瘦身了。。。嗯。。先贴出来,明天接着看
总的来看,流程和思路和94是完全一模一样的,只是具体的实现有了变化。
备注【1】【6】:HRegionLocation缓存和读取缓存,其中【1】读取缓存完全没有任何变化,但是【6】缓存HRegionlocation变化很大。94版的流程很简单,将region对应的服务器信息拿出来并缓存,然后将当前region加入到table的region缓存中。
96版的首先是获取table的所有region信息有了变化,以前是按照byte[]型tableName的hash值当键做的缓存索引,这里使用了TableName类型直接作为键,执行效率会有细微的降低,但是逻辑就更清楚了。(1)
然后serverName缓存也有了细微的变换,以前是缓存一个串,现在是缓存了一个ServerName类型。(2)
最后多了一个source参数,也是HRegionLocation类型,非空表示不是直接从meta表中查询到的。一般来说为空。(3)
搞定,主要是对是否需要缓存当前拿到的region信息多了很多判断,看来服务端的代码也有很多不同的坑要填了。
然后看备注【2】的代码,获取RPC调用连接。之前是是一个HRegionInterface的实例,这里变成了ClientService.BlockingInterface实例。不过创建方式差不多的呢。
温故知新,先回顾下94版本的实现:
首先需要一个RPCEngine,默认为WritableRpcEngine,一个RPCInterface,默认是HRegionInterface。然后创建一个Invocation托管的实例,这个实例再委托HBaseclient通过传统的Socket完成RPC。其中RPCEngine完成对参数和方法的序列化--反序列化。
这里从表象上看,原有的方式被完全抛弃了,首先不再需要RPCEngine,而是使用了一个RpcClient,这是一个全新的类。RPC也不再直接使用传统的socket,而是使用了google的protobuf组件。表示不熟悉这个组件,暂时先把代码码出来吧。
备注【2】只能这么多了,protobuf这个框架确实不熟悉。看看备注备注【3】吧,预抓取
MetaScanner.metaScan跟之前的版本几乎完全一样,不贴了
先回顾下94,原来的做法是遍历操作,然后根据每个操作来定位region,按后加入region的任务队列,没有则创建。定位region的操作由HConnectionManager.HConnectionImplementation.locateRegion方法完成,这里由AsyncProcessor.findDestLocation完成
这里依然是循环,不过循环体有些变化
private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) { if (row == null) throw new IllegalArgumentException("row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try { loc = hConnection.locateRegion(this.tableName, row.getRow()); if (loc == null) { locationException = new IOException("No location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } } catch (IOException e) { locationException = e; } if (locationException != null) { // There are multiple retries in locateRegion already. No need to add new. // We can't continue with this row, hence it's the last retry. manageError(numAttempt, posInList, row, false, locationException, null); return null; } return loc; }
可以看出其实还是调用HConnectionImplementation.locateRegion,但是HConnectionImplementation.locateRegion方法本身的内容又不一样了
private HRegionLocation locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } if (tableName.equals(TableName.META_TABLE_NAME)) { return this.registry.getMetaRegionLocation(); } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row, useCache, userRegionLock, retry); } }
嗯,ROOT表的特殊处理流程已经不见了,这里有两个值得注意的地方,ROOT表已经改名为hbase:namespace,META则是hbase:meta,META表的处理流程也发生了变化,唯一不变的是user表
先看看变化的部分:调用了Registry的一个方法,具体由ZooKeeperRegistry实现,我很高兴的告诉亲,both of 他们都是新增的,94版本中没有。
public HRegionLocation getMetaRegionLocation() throws IOException { ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); try { if (LOG.isTraceEnabled()) { LOG.trace("Looking up meta region location in ZK," + " connection=" + this); } ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout); if (LOG.isTraceEnabled()) { LOG.trace("Looked up meta region location, connection=" + this + "; serverName=" + ((servername == null) ? "null" : servername)); } if (servername == null) return null; return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { zkw.close(); } } public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw, final long timeout) throws InterruptedException { byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.metaServerZNode, timeout); if (data == null) return null; try { return ServerName.parseFrom(data); } catch (DeserializationException e) { LOG.warn("Failed parse", e); return null; } }
我勒个去。。。这也太简单了。。。这是不需要ROOT表了么,直接从ZK取?说好的zk存root表的region信息,root表存meta表的region信息的,怎么能够这么改,节操呢。。。不过这样看的话,少了一层IO明显变得NB了
不过locateRegionInMeta明显也瘦身了。。。嗯。。先贴出来,明天接着看
private HRegionLocation locateRegionInMeta(final TableName parentTable, final TableName tableName, final byte [] row, boolean useCache, Object regionLockObject, boolean retry) throws IOException { HRegionLocation location; // If we are supposed to be using the cache, look in the cache to see if // we already have the region. // 上来还是首先从缓存里面读取,方法接口没有变。备注【1】 if (useCache) { location = getCachedLocation(tableName, row); if (location != null) { return location; } } int localNumRetries = retry ? numTries : 1; // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. // 还是定义region信息查询键 byte [] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); for (int tries = 0; true; tries++) { if (tries >= localNumRetries) { throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " after " + numTries + " tries."); } HRegionLocation metaLocation = null; try { // locate the meta region // 还是先查询父表。不过如上所示,父表查询接口有了很大的变化,已经没有root表的查询了,直接从ZK获取meta表信息 metaLocation = locateRegion(parentTable, metaKey, true, false); // If null still, go around again. if (metaLocation == null) continue; // RPC的调用方有了些变化,原来是使用HRgionInterface实例进行查询,实际上由委托代理,调用HBaseclient方法实现。这里先记下来备注【2】 ClientService.BlockingInterface service = getClient(metaLocation.getServerName()); Result regionInfoRow; // This block guards against two threads trying to load the meta // region at the same time. The first will load the meta region and // the second will use the value that the first one found. synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. // 和之前一样,类似于读写双锁检查,进入写模块之后,需要再次确认缓存,避免重复请求 if (useCache) { location = getCachedLocation(tableName, row); if (location != null) { return location; } // If the parent table is META, we may want to pre-fetch some // region info into the global region cache for this table. if (parentTable.equals(TableName.META_TABLE_NAME) && (getRegionCachePrefetch(tableName))) { prefetchRegionCache(tableName, row);//和之前一样,父表为meta表则进行预抓取。这里新增了一个TableName的类型。直接使用该类型的比较方法,去掉了之前的直接byte比较。备注【3】 } location = getCachedLocation(tableName, row);//还是一样,第三次确认 if (location != null) { return location; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. forceDeleteCachedLocation(tableName, row);//这里只是方法名变了,不过内部实现,内部的内部实现跟之前完全是一模一样的。 } // Query the meta region for the location of the meta region // 查询Region meta,有变化。 regionInfoRow = ProtobufUtil.getRowOrBefore(service, metaLocation.getRegionInfo().getRegionName(), metaKey, HConstants.CATALOG_FAMILY);//备注【4】,这属于读操作,后续再看 } if (regionInfoRow == null) { throw new TableNotFoundException(tableName); } // convert the row result into the HRegionLocation we need! // 将查询结果转化为一个HRegionInfo,这里的调用有些改变,之前是先获取value,然后委托Writables.getWritable( value, new HRegionInfo()),这里语义更明显。备注【5】,这里委托了protobuf,只能先放放了 HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow); // 跟之前的判断一样,检查regionInfo是否为空,是否属于当前表,是否正在split或者是否已经下线。多了一个down机检查。 if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + parentTable + ", row=" + regionInfoRow); } // possible we got a region of a different table... if (!regionInfo.getTable().equals(tableName)) { throw new TableNotFoundException( "Table '" + tableName + "' was not found, got: " + regionInfo.getTable() + "."); } if (regionInfo.isSplit()) { throw new RegionOfflineException("the only available region for" + " the required row is a split parent," + " the daughters should be online soon: " + regionInfo.getRegionNameAsString()); } if (regionInfo.isOffline()) { throw new RegionOfflineException("the region is offline, could" + " be caused by a disable table call: " + regionInfo.getRegionNameAsString()); } ServerName serverName = HRegionInfo.getServerName(regionInfoRow);//这里将原有的byte[]信息转化为对象,代码可读性更好了,94中这里很多byte[]解析。 if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + parentTable + " for region " + regionInfo.getRegionNameAsString() + " containing row " + Bytes.toStringBinary(row)); } if (isDeadServer(serverName)){ throw new RegionServerStoppedException("hbase:meta says the region "+ regionInfo.getRegionNameAsString()+" is managed by the server " + serverName + ", but it is dead."); } // Instantiate the location // HRegionLocation的构造稍微变复杂了,其实也就是封装了下,唯一多出来的就是一个seqNum,不知道具体作用还 location = new HRegionLocation(regionInfo, serverName, HRegionInfo.getSeqNumDuringOpen(regionInfoRow)); cacheLocation(tableName, null, location);//缓存,备注【6】 return location; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming // from the HTable constructor. throw e; } catch (IOException e) { if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } if (tries < numTries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + parentTable + ", metaLocation=" + ((metaLocation == null)? "null": "{" + metaLocation + "}") + ", attempt=" + tries + " of " + this.numTries + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { throw e; } // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { relocateRegion(parentTable, metaKey); } } try{ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Giving up trying to location region in " + "meta: thread is interrupted."); } } }
总的来看,流程和思路和94是完全一模一样的,只是具体的实现有了变化。
备注【1】【6】:HRegionLocation缓存和读取缓存,其中【1】读取缓存完全没有任何变化,但是【6】缓存HRegionlocation变化很大。94版的流程很简单,将region对应的服务器信息拿出来并缓存,然后将当前region加入到table的region缓存中。
96版的首先是获取table的所有region信息有了变化,以前是按照byte[]型tableName的hash值当键做的缓存索引,这里使用了TableName类型直接作为键,执行效率会有细微的降低,但是逻辑就更清楚了。(1)
然后serverName缓存也有了细微的变换,以前是缓存一个串,现在是缓存了一个ServerName类型。(2)
最后多了一个source参数,也是HRegionLocation类型,非空表示不是直接从meta表中查询到的。一般来说为空。(3)
private void cacheLocation(final TableName tableName, final HRegionLocation source, final HRegionLocation location) { boolean isFromMeta = (source == null); byte [] startKey = location.getRegionInfo().getStartKey(); Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);//(1) boolean isNewCacheEntry = false; boolean isStaleUpdate = false; HRegionLocation oldLocation = null; synchronized (this.cachedRegionLocations) { cachedServers.add(location.getServerName());//(2) oldLocation = tableLocations.get(startKey);// 还是用startKey做索引 isNewCacheEntry = (oldLocation == null);//如果能查询出来,则表示更新 // If the server in cache sends us a redirect, assume it's always valid. if (!isNewCacheEntry && !oldLocation.equals(source)) { //如果是更新,而且确实需要更新,这个equals方法只是判断了ServerName对象是否相等。代码就不贴了 long newLocationSeqNum = location.getSeqNum(); // 这里解释了两个判断是否为陈旧信息的判断标准,有可能服务器自己关闭了老的region,当我们请求的时候,告诉我们一个新的region信息,类似于我们用一个url信息请求一个网页,但是收到一个302 // 或者服务器关闭了一个region,但是用相同的序列号又打开了一个新的。之前HRegionlocation的构造函数多了一个seqNum在这里就用上了,看来96版的region信息管理跟之前的版本相比较有较大的变化 // Meta record is stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum); // Same as above for redirect. However, in this case, if the number is equal to previous // record, the most common case is that first the region was closed with seqNum, and then // opened with the same seqNum; hence we will ignore the redirect. // There are so many corner cases with various combinations of opens and closes that // an additional counter on top of seqNum would be necessary to handle them all. boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum); isStaleUpdate = (isStaleMetaRecord || isStaleRedirect); } if (!isStaleUpdate) { tableLocations.put(startKey, location); } } if (isNewCacheEntry) { if (LOG.isTraceEnabled()) { LOG.trace("Cached location for " + location.getRegionInfo().getRegionNameAsString() + " is " + location.getHostnamePort()); } } else if (isStaleUpdate && !location.equals(oldLocation)) { if (LOG.isTraceEnabled()) { LOG.trace("Ignoring stale location update for " + location.getRegionInfo().getRegionNameAsString() + ": " + location.getHostnamePort() + " at " + location.getSeqNum() + "; local " + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum()); } } }
搞定,主要是对是否需要缓存当前拿到的region信息多了很多判断,看来服务端的代码也有很多不同的坑要填了。
然后看备注【2】的代码,获取RPC调用连接。之前是是一个HRegionInterface的实例,这里变成了ClientService.BlockingInterface实例。不过创建方式差不多的呢。
温故知新,先回顾下94版本的实现:
首先需要一个RPCEngine,默认为WritableRpcEngine,一个RPCInterface,默认是HRegionInterface。然后创建一个Invocation托管的实例,这个实例再委托HBaseclient通过传统的Socket完成RPC。其中RPCEngine完成对参数和方法的序列化--反序列化。
这里从表象上看,原有的方式被完全抛弃了,首先不再需要RPCEngine,而是使用了一个RpcClient,这是一个全新的类。RPC也不再直接使用传统的socket,而是使用了google的protobuf组件。表示不熟悉这个组件,暂时先把代码码出来吧。
public ClientService.BlockingInterface getClient(final ServerName sn) throws IOException { if (isDeadServer(sn)) { throw new RegionServerStoppedException(sn + " is dead."); } String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort()); // 这个key简单,这俩玩意中间加个@连起来 this.connectionLock.putIfAbsent(key, key); ClientService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { stub = (ClientService.BlockingInterface)this.stubs.get(key); if (stub == null) { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, user, this.rpcTimeout); stub = ClientService.newBlockingStub(channel); // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. // Just fail on first actual call rather than in here on setup. this.stubs.put(key, stub); } } return stub; }
备注【2】只能这么多了,protobuf这个框架确实不熟悉。看看备注备注【3】吧,预抓取
private void prefetchRegionCache(final TableName tableName, final byte[] row) { // Implement a new visitor for MetaScanner, and use it to walk through // the hbase:meta // 还是用MetaScannerVisitorBase来遍历meta信息,跟之前一样,用MetaScanner来简化了之前基于Writable的HRegionInfo解析 MetaScannerVisitor visitor = new MetaScannerVisitorBase() { public boolean processRow(Result result) throws IOException { try { HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result); if (regionInfo == null) { return true; } // possible we got a region of a different table... if (!regionInfo.getTable().equals(tableName)) { return false; // stop scanning } if (regionInfo.isOffline()) { // don't cache offline regions return true; } ServerName serverName = HRegionInfo.getServerName(result); if (serverName == null) { return true; // don't cache it } // instantiate the location long seqNum = HRegionInfo.getSeqNumDuringOpen(result);//知道HRegionlocation的seqNum哪儿来的了。MS META表加料了哦。info:seqnumDuringOpen,没有则为-1 HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum); // cache this meta entry cacheLocation(tableName, null, loc); return true; } catch (RuntimeException e) { throw new IOException(e); } } }; try { // pre-fetch certain number of regions info at region cache. MetaScanner.metaScan(conf, this, visitor, tableName, row, this.prefetchRegionLimit, TableName.META_TABLE_NAME); } catch (IOException e) { LOG.warn("Encountered problems when prefetch hbase:meta table: ", e); } } public static ServerName getServerName(final Result r) { byte[] value = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); if (value == null || value.length == 0) return null; String hostAndPort = Bytes.toString(value); value = r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);//还记得94版本中的那个坑么,最后取出来这个cell的值但是没有使用,没想到在这里用上了,这是下一盘很大的棋么 if (value == null || value.length == 0) return null; return new ServerName(hostAndPort, Bytes.toLong(value)); }
MetaScanner.metaScan跟之前的版本几乎完全一样,不贴了