hadoop架构之BlockPoolManager源码分析讲解
程序员文章站
2022-07-05 23:23:28
在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在
DataNode定义了一个Block...
在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在
DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。
DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执行,每一个DataNode都有一个BlockManager的实例
一 BPServiceActor分析
BPServiceActor负责与一个NameNode进行通信,每一个BPServiceActor都是一个独立的线程,主要功能:
>>与NameNode进行第一次握手,获取命名空间的信息 >>向NameNode注册当前DataNode >>定期向NameNode发送心跳,增量块汇报,全量块汇报,缓存块汇报等 >>执行NameNode传回的指令 static final Log LOG = DataNode.LOG; //NameNode 地址 finalInetSocketAddress nnAddr; //NameNode 状态 HAServiceStatestate; //所持有的BPOfferService对象 final BPOfferService bpos; //当前的工作线程 ThreadbpThread; //向Name Node发送RPC请求的代理 DatanodeProtocolClientSideTranslatorPBbpNamenode; //当前BPServiceActor的运行状态,初始状态时CONNECTING static enum RunningState { CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED; } private volatile RunningState runningState = RunningState.CONNECTING; //用于保存2次块汇报之间Data Node存储数据块的变化 private final Map<DatanodeStorage, PerStoragePendingIncrementalBR> pendingIncrementalBRperStorage = Maps.newHashMap(); //DataNode对象的引用 private final DataNode dn; //用于记录Data Node的注册信息 private DatanodeRegistration bpRegistration; //初始化 try { //与Name Node握手并进行Data Node注册 connectToNNAndHandshake(); break; } catch (IOException ioe) { //初始化握手出现失败,运行状态置为INIT_FAILED runningState = RunningState.INIT_FAILED; if (shouldRetryInit()) { // Retry until all namenode's of BPOSfailed initialization sleepAndLogInterrupts(5000, "initializing"); } else { runningState = RunningState.FAILED; return; } } } //初始化成功,状态置为RUNNING runningState = RunningState.RUNNING; //循环调用offerService方法向NameNode发送心跳,块汇报,增量汇报以及缓存快汇报等 while (shouldRun()) { try { offerService(); } catch (Exception ex) { //收到异常也不会处理直到BPServiceActor停止或者Data Node停止 sleepAndLogInterrupts(5000, "offeringservice"); } } //BPServiceActor停止以后,状态置为EXITED runningState = RunningState.EXITED; private void connectToNNAndHandshake() throws IOException { //获取Name Node 的PRC 代理 bpNamenode = dn.connectToNN(nnAddr); //第一次握手去获取namespace 信息 NamespaceInfo nsInfo = retrieveNamespaceInfo(); bpos.verifyAndSetNamespaceInfo(nsInfo); //第二次握手则是向Name Node注册这个Data Node register(nsInfo); } private void offerService() throwsException { long fullBlockReportLeaseId = 0; while (shouldRun()) { try { final long startTime = scheduler.monotonicNow(); //判断是否发送心跳信息 final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); HeartbeatResponse resp = null; //如果要发送心跳 if (sendHeartbeat) { boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) && scheduler.isBlockReportDue(startTime); scheduler.scheduleNextHeartbeat(); if (!dn.areHeartbeatsDisabledForTests()) { //发送心跳信息 resp = sendHeartBeat(requestBlockReportLease); assert resp != null; if (resp.getFullBlockReportLeaseId() != 0) { fullBlockReportLeaseId = resp.getFullBlockReportLeaseId(); } dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime); //对心跳响应中携带的NameNode的HA状态进行处理 bpos.updateActorStatesFromHeartbeat( this, resp.getNameNodeHaState()); state = resp.getNameNodeHaState().getState(); if (state == HAServiceState.ACTIVE) { handleRollingUpgradeStatus(resp); } long startProcessCommands = monotonicNow(); //处理响应中带回的Name Node指令 if (!processCommand(resp.getCommands())) continue; long endProcessCommands = monotonicNow(); } } if (sendImmediateIBR || sendHeartbeat) { reportReceivedDeletedBlocks(); } List<DatanodeCommand> cmds = null; boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false); if (forceFullBr) { LOG.info("Forcinga full block report to " + nnAddr); } if ((fullBlockReportLeaseId != 0) || forceFullBr) { cmds = blockReport(fullBlockReportLeaseId); fullBlockReportLeaseId = 0; } processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); if (!dn.areCacheReportsDisabledForTests()) { DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd }); } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // long waitTime = scheduler.getHeartbeatWaitTime(); synchronized(pendingIncrementalBRperStorage) { if (waitTime > 0 && !sendImmediateIBR) { try { pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferServicefor " + this + " interrupted"); } } } // synchronized } catch(RemoteException re) { String reClass = re.getClassName(); if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn(this + " is shutting down", re); shouldServiceRun = false; return; } LOG.warn("RemoteExceptionin offerService", re); try { long sleepTime = Math.min(1000, dnConf.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } catch (IOException e) { LOG.warn("IOExceptionin offerService", e); } processQueueMessages(); } // while (shouldRun()) } // offerService void updateActorStatesFromHeartbeat(BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { writeLock(); try { //取到Name Node的txid final long txid = nnHaState.getTxId(); //当前这个NameNode是否声明自己为Active NameNode final boolean nnClaimsActive = nnHaState.getState() == HAServiceState.ACTIVE; //BPOfferService是否认为当前Name Node为Active NameNode final boolean bposThinksActive = bpServiceToActive == actor; //当前Name Node携带的txid是否大于原Active NameNode 的txid final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; //当前这个是Acitve,但是BPOfferService所记录的不是Active,说明Standby已经切换成Active if (nnClaimsActive && !bposThinksActive) { //如果有两个namenode声明为active,当前的请求过时 if (!isMoreRecentClaim) { //直接忽略 return; } else {//当前请求是最新的请求 if (bpServiceToActive == null) { //BPOfferService上还没有保存active name node } else { } //将bpServiceToActive指向当前的NameNode对应的 BPOfferService bpServiceToActive = actor; } } else if (!nnClaimsActive && bposThinksActive) { //原来Active Name Node现在声明为StandbyName Node bpServiceToActive = null; } //更新lastActiveClaimTxId if (bpServiceToActive == actor) { assert txid >= lastActiveClaimTxId; lastActiveClaimTxId = txid; } } finally { writeUnlock(); } } 二 BPOfferService BPOfferService就是对DataNode每一个BlockPool进行管理的类。 重要的字段: //握手之后获取到的Namespace信息 NamespaceInfobpNSInfo; //当前Block Pool在Name Node上的注册信息,这个信息是在Data Node注册阶段获取的 volatileDatanodeRegistration bpRegistration; //当前DataNode的引用 private final DataNode dn; //BPServiceActor的引用,这个代表着是Active的Name Node对应的对象 privateBPServiceActor bpServiceToActive = null; //当前命名空间中所有NameNode对应的BPServiceActor对象 private final List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>(); //每当收到一个NameNode的时候,就记录最近的txid private long lastActiveClaimTxId = -1; 主要的方法分类: 1)触发汇报 trySendErrorReport(),reportRemoteBadBlock,reportBadBlock()实现了向NameNode发送错误汇报,汇报远程坏块以及本地坏块的操作,会直接调用BPServiceActor对应操作 void trySendErrorReport(int errCode, String errMsg) { for (BPServiceActor actor : bpServices) { ErrorReportAction errorReportAction = new ErrorReportAction (errCode, errMsg); actor.bpThreadEnqueue(errorReportAction); } } 2)添加与删除数据块操作 当DataNode接收一个新的数据块时,比如客户端通过数据流管道写入一个数据块,或者通过DataTransferProtocal流式接口复制一个数据块时候,都会调用BPOfferService.notifyNameNodeReceiveBlock()。 当DataNdoe删除一个已有的数据块的时候,会调用BPOfferService .notifyNamenodeDeletedBlock()方法通知命名空间。 3)响应NameNode的指令 boolean processCommandFromActor(DatanodeCommand cmd,BPServiceActor actor) throws IOException { if (cmd == null) { return true; } //如果Name Node返回的指令要求Data Node重新注册的,则调用BPServiceActor.register方法 if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) { actor.reRegister(); return false; } writeLock(); try { //对于Active Name Node返回的指令,调用processCommandFromActive if (actor == bpServiceToActive) { return processCommandFromActive(cmd, actor); } else { //对于Standbu Name Node返回的指令,调用processCommandFromStandby return processCommandFromStandby(cmd, actor); } } finally { writeUnlock(); } } processCommandFromStandby处理来自StandbyName Node的指令,直接忽略即可。防止在HA部署下出现脑裂的情况,也就是ActiveNameNode和StandbyNameNode同时向DataNode下指令。所以BPOfferService对象并不执行Standby返回的字节指令 三BlockPoolManager BlockPoolManager类负责管理所有的BPOfferService实例,对外提供添加、删除、启动关闭BPOfferService类的接口。所有BPOfferService的操作,都必须通过BlockPoolManager类提供的方法来执行 DataNode启动的时候,会初始化BlockPoolManager对象,然后调用BlockPoolManager.refreshNamenodes()完成对BlockPoolManager的构造 //<namespaceId,BPOfferService>命名空间id与BPOfferService private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap(); //<blockId,BPOfferService>块池id与BPOfferService映射 private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap(); //持有一个BPOfferService List private final List<BPOfferService> offerServices =Lists.newArrayList(); //持有一个Data Node的引用 private final DataNode dn;
推荐阅读
-
jQuery 2.0.3 源码分析之core(一)整体架构
-
jQuery源码之总体架构分析
-
hadoop2.6源码解读之DFSClient方法调用的RPC流程讲解
-
Hadoop源码学习笔记之NameNode启动流程分析一:源码环境搭建和项目模块及NameNode结构简单介绍
-
hadoop架构之BlockPoolManager源码分析讲解
-
Hadoop源码分析五hdfs架构原理剖析
-
hadoop2.6源码解读之DFSClient方法调用的RPC流程讲解
-
hadoop源码解析之hdfs内部结构分析
-
Hadoop源码学习笔记之NameNode启动流程分析一:源码环境搭建和项目模块及NameNode结构简单介绍
-
jQuery源码之总体架构分析