欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

hadoop架构之BlockPoolManager源码分析讲解

程序员文章站 2022-07-05 23:23:28
在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在 DataNode定义了一个Block...

在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在

DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。

DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执行,每一个DataNode都有一个BlockManager的实例

一 BPServiceActor分析

BPServiceActor负责与一个NameNode进行通信,每一个BPServiceActor都是一个独立的线程,主要功能:

>>与NameNode进行第一次握手,获取命名空间的信息
>>向NameNode注册当前DataNode
>>定期向NameNode发送心跳,增量块汇报,全量块汇报,缓存块汇报等
>>执行NameNode传回的指令
 
static final Log LOG = DataNode.LOG;
//NameNode 地址
finalInetSocketAddress nnAddr;
//NameNode 状态
HAServiceStatestate;
//所持有的BPOfferService对象
final BPOfferService bpos;
//当前的工作线程
ThreadbpThread;
//向Name Node发送RPC请求的代理
DatanodeProtocolClientSideTranslatorPBbpNamenode;
//当前BPServiceActor的运行状态,初始状态时CONNECTING
static enum RunningState {
    CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
}
private volatile RunningState runningState = RunningState.CONNECTING; 
//用于保存2次块汇报之间Data Node存储数据块的变化
private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
      pendingIncrementalBRperStorage = Maps.newHashMap();
//DataNode对象的引用
private final DataNode dn;
//用于记录Data Node的注册信息
private DatanodeRegistration bpRegistration;
 
//初始化
try {
    //与Name Node握手并进行Data Node注册
    connectToNNAndHandshake();
    break;
} catch (IOException ioe) {
    //初始化握手出现失败,运行状态置为INIT_FAILED
    runningState = RunningState.INIT_FAILED;
    if (shouldRetryInit()) {
        // Retry until all namenode's of BPOSfailed initialization
        sleepAndLogInterrupts(5000, "initializing");
    } else {
        runningState = RunningState.FAILED;
        return;
    }
}
}
//初始化成功,状态置为RUNNING
runningState = RunningState.RUNNING;
//循环调用offerService方法向NameNode发送心跳,块汇报,增量汇报以及缓存快汇报等
while (shouldRun()) {
    try {
      offerService();
    } catch (Exception ex) {
  //收到异常也不会处理直到BPServiceActor停止或者Data Node停止
  sleepAndLogInterrupts(5000, "offeringservice");
    }
  }
  //BPServiceActor停止以后,状态置为EXITED
  runningState = RunningState.EXITED;
 
  private void connectToNNAndHandshake() throws IOException {
    //获取Name Node 的PRC 代理
    bpNamenode = dn.connectToNN(nnAddr);
    //第一次握手去获取namespace 信息
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    //第二次握手则是向Name Node注册这个Data Node
    register(nsInfo);
  }
 
private void offerService() throwsException {
    long fullBlockReportLeaseId = 0;
    while (shouldRun()) {
      try {
        final long startTime = scheduler.monotonicNow();
        //判断是否发送心跳信息
        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
        HeartbeatResponse resp = null;
        //如果要发送心跳
        if (sendHeartbeat) {
          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          scheduler.scheduleNextHeartbeat();
          if (!dn.areHeartbeatsDisabledForTests()) {
            //发送心跳信息
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
            //对心跳响应中携带的NameNode的HA状态进行处理
            bpos.updateActorStatesFromHeartbeat(
                this, resp.getNameNodeHaState());
            state = resp.getNameNodeHaState().getState();
 
            if (state == HAServiceState.ACTIVE) {
              handleRollingUpgradeStatus(resp);
            }
 
            long startProcessCommands = monotonicNow();
            //处理响应中带回的Name Node指令
            if (!processCommand(resp.getCommands()))
              continue;
            long endProcessCommands = monotonicNow();
          }
        }
        if (sendImmediateIBR || sendHeartbeat) {
          reportReceivedDeletedBlocks();
        }
 
        List<DatanodeCommand> cmds = null;
        boolean forceFullBr =
            scheduler.forceFullBlockReport.getAndSet(false);
        if (forceFullBr) {
          LOG.info("Forcinga full block report to " + nnAddr);
        }
        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
          cmds = blockReport(fullBlockReportLeaseId);
          fullBlockReportLeaseId = 0;
        }
        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
 
        if (!dn.areCacheReportsDisabledForTests()) {
          DatanodeCommand cmd = cacheReport();
          processCommand(new DatanodeCommand[]{ cmd });
        }
 
        //
        // There is no work to do;  sleep until hearbeat timer elapses,
        // or work arrives, and then iterate again.
        //
        long waitTime = scheduler.getHeartbeatWaitTime();
        synchronized(pendingIncrementalBRperStorage) {
          if (waitTime > 0 && !sendImmediateIBR) {
            try {
              pendingIncrementalBRperStorage.wait(waitTime);
            } catch (InterruptedException ie) {
              LOG.warn("BPOfferServicefor " + this + " interrupted");
            }
          }
        } // synchronized
      } catch(RemoteException re) {
        String reClass = re.getClassName();
        if (UnregisteredNodeException.class.getName().equals(reClass) ||
            DisallowedDatanodeException.class.getName().equals(reClass) ||
            IncorrectVersionException.class.getName().equals(reClass)) {
          LOG.warn(this + " is shutting down", re);
          shouldServiceRun = false;
          return;
        }
        LOG.warn("RemoteExceptionin offerService", re);
        try {
          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
          Thread.sleep(sleepTime);
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
        }
      } catch (IOException e) {
        LOG.warn("IOExceptionin offerService", e);
      }
      processQueueMessages();
    } // while (shouldRun())
  } // offerService
 
void updateActorStatesFromHeartbeat(BPServiceActor actor,
      NNHAStatusHeartbeat nnHaState) {
    writeLock();
    try {
      //取到Name Node的txid
      final long txid = nnHaState.getTxId();
      //当前这个NameNode是否声明自己为Active NameNode
      final boolean nnClaimsActive =
          nnHaState.getState() == HAServiceState.ACTIVE;
      //BPOfferService是否认为当前Name Node为Active NameNode
      final boolean bposThinksActive = bpServiceToActive == actor;
      //当前Name Node携带的txid是否大于原Active NameNode 的txid
      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
      //当前这个是Acitve,但是BPOfferService所记录的不是Active,说明Standby已经切换成Active
      if (nnClaimsActive && !bposThinksActive) {
        //如果有两个namenode声明为active,当前的请求过时
        if (!isMoreRecentClaim) {
           //直接忽略
          return;
        } else {//当前请求是最新的请求
          if (bpServiceToActive == null) {
            //BPOfferService上还没有保存active name node
          } else {
          }
          //将bpServiceToActive指向当前的NameNode对应的 BPOfferService
          bpServiceToActive = actor;
        }
      } else if (!nnClaimsActive && bposThinksActive) {
        //原来Active Name Node现在声明为StandbyName Node
        bpServiceToActive = null;
      }
      //更新lastActiveClaimTxId
      if (bpServiceToActive == actor) {
        assert txid >= lastActiveClaimTxId;
        lastActiveClaimTxId = txid;
      }
    } finally {
      writeUnlock();
    }
  }
 
二 BPOfferService
BPOfferService就是对DataNode每一个BlockPool进行管理的类。
重要的字段:
//握手之后获取到的Namespace信息
NamespaceInfobpNSInfo;
//当前Block Pool在Name Node上的注册信息,这个信息是在Data Node注册阶段获取的
volatileDatanodeRegistration bpRegistration;
//当前DataNode的引用
private final DataNode dn;
//BPServiceActor的引用,这个代表着是Active的Name Node对应的对象
privateBPServiceActor bpServiceToActive = null;
//当前命名空间中所有NameNode对应的BPServiceActor对象
private final List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();
//每当收到一个NameNode的时候,就记录最近的txid
private long lastActiveClaimTxId = -1;
主要的方法分类:
1)触发汇报
trySendErrorReport(),reportRemoteBadBlock,reportBadBlock()实现了向NameNode发送错误汇报,汇报远程坏块以及本地坏块的操作,会直接调用BPServiceActor对应操作
void trySendErrorReport(int errCode, String errMsg) {
    for (BPServiceActor actor : bpServices) {
      ErrorReportAction errorReportAction = new ErrorReportAction
          (errCode, errMsg);
      actor.bpThreadEnqueue(errorReportAction);
    }
}
2)添加与删除数据块操作
当DataNode接收一个新的数据块时,比如客户端通过数据流管道写入一个数据块,或者通过DataTransferProtocal流式接口复制一个数据块时候,都会调用BPOfferService.notifyNameNodeReceiveBlock()。
当DataNdoe删除一个已有的数据块的时候,会调用BPOfferService
.notifyNamenodeDeletedBlock()方法通知命名空间。
 
3)响应NameNode的指令
boolean processCommandFromActor(DatanodeCommand cmd,BPServiceActor actor) throws IOException {
    if (cmd == null) {
      return true;
    }
    //如果Name Node返回的指令要求Data Node重新注册的,则调用BPServiceActor.register方法
    if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) {
      actor.reRegister();
      return false;
    }
    writeLock();
    try {
      //对于Active Name Node返回的指令,调用processCommandFromActive
      if (actor == bpServiceToActive) {
        return processCommandFromActive(cmd, actor);
      } else {
      //对于Standbu Name Node返回的指令,调用processCommandFromStandby
        return processCommandFromStandby(cmd, actor);
      }
    } finally {
      writeUnlock();
    }
}
processCommandFromStandby处理来自StandbyName Node的指令,直接忽略即可。防止在HA部署下出现脑裂的情况,也就是ActiveNameNode和StandbyNameNode同时向DataNode下指令。所以BPOfferService对象并不执行Standby返回的字节指令
 
三BlockPoolManager
BlockPoolManager类负责管理所有的BPOfferService实例,对外提供添加、删除、启动关闭BPOfferService类的接口。所有BPOfferService的操作,都必须通过BlockPoolManager类提供的方法来执行
 
DataNode启动的时候,会初始化BlockPoolManager对象,然后调用BlockPoolManager.refreshNamenodes()完成对BlockPoolManager的构造
//<namespaceId,BPOfferService>命名空间id与BPOfferService
private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap();
//<blockId,BPOfferService>块池id与BPOfferService映射
private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap();
//持有一个BPOfferService List
private final List<BPOfferService> offerServices =Lists.newArrayList();
//持有一个Data Node的引用
private final DataNode dn;