欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

DataNode的流式接口讲解

程序员文章站 2022-04-01 20:49:39
DataNode最重要的功能就是管理物理存储上的数据块,并与NameNode和客户端通信执行读写数据块的操作。这里的读写涉及到大量的数据传输,例如DFSClient将数据块写入DataNode...

DataNode最重要的功能就是管理物理存储上的数据块,并与NameNode和客户端通信执行读写数据块的操作。这里的读写涉及到大量的数据传输,例如DFSClient将数据块写入DataNode, DFSClient从DataNode读取数据,以及将DataNode数据块复制到其他数据节点。这些操作都涉及大量的I/O

在DataNode实现中,对这些读写操作提供了基于TCP流的数据访问接口DataTransferProtocal。

一 DataTransferProtocal

DataTransferProtocal是用来描述从DataNode读或者写数据的另一个接口,方法有:

readBlock:从当前DataNode读取数据块

writeBlock:写数据块到当前DataNode

transferBlock:传输数据块

releaseShortCirciutFds

requestShortCirciutShm

copyBlock:复制数据块

blockChecksum:获取指定数据块的校验值

DataTransferProtocal有2个子类Sender和Receiver。

Sender封装了DataTransferProtocal的调用操作,用于发起流式接口的请求。

Receiver封装了DataTransferProtocal的执行操作,用于响应流式接口的请求

假设DFSClient发起一个DataTransferProtocal.readBlock操作,那么DFSClient就会调用Sender类将这个请求序列化,并且传输给远程的Receiver。远程的Receiver类接受到这个请求之后,会反序列化请求,然后调用执行代码执行读取操作

二 Sender

Sender首先使用ProtoBuf序列化参数,然后用一个枚举类Op描述调用的是什么方法,最后将序列化的参数和Op一起发送给接收方 。

@Override

public voidreadBlock(final ExtendedBlockblk,

final Token blockToken,

final String clientName,

final longblockOffset,

final longlength,

final booleansendChecksum,

final CachingStrategy cachingStrategy) throws IOException {

//将所有参数使用ProtoBuf序列化

OpReadBlockProto proto = OpReadBlockProto.newBuilder()

.setHeader(DataTransferProtoUtil.buildClientHeader(blk,clientName, blockToken))

.setOffset(blockOffset)

.setLen(length)

.setSendChecksums(sendChecksum)

.setCachingStrategy(getCachingStrategy(cachingStrategy))

.build();

//调用send方法发送Op.READ_BLOCK描述当前使用的是readBlock方法,同时发送序列化后的擦数proto

send(out, Op.READ_BLOCK,proto);

}

privatestatic voidsend(final DataOutputStreamout, final Opopcode,

final Messageproto) throws IOException {

if (LOG.isTraceEnabled()) {

LOG.trace("SendingDataTransferOp " +proto.getClass().getSimpleName()

+": " + proto);

}

//调用op方法写入版本号和操作码

op(out, opcode);

//写入序列化后的参数

proto.writeDelimitedTo(out);

out.flush();

}

当我们调用Sender发起一个readBlock()请求时,Sender会将readBlock请求使用PrtotoBuf序列化,然后通过I/O流发送到远程的DataNode.

DataNode读取到这个请求,会调用Receiver类的对应的readBlock方法opReadBlock方法反序列化请求,然后执行readBlock操作

三 Receiver

Receiver是一个抽象类,用于执行远程节点发起的流逝接口请求,他提供了解析Sender请求的操作码的readOp方法以及处理请求processOp方法,processOp接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作,针对不同的操作码执行不同的op操作码函数,比如是readBlock,这里就会调用opReadBlock方法,然后从IO流读取序列化的请求参数,并进行反序列化,然后调用子类DataXceiver的对应方法进行执行。

/** 从IO流读取版本号和Op操作码*/

protected final OpreadOp() throws IOException {

//从IO流读取版本号

final shortversion = in.readShort();

if (version !=DataTransferProtocol.DATA_TRANSFER_VERSION) {

throw newIOException( "Version Mismatch (Expected: " +

DataTransferProtocol.DATA_TRANSFER_VERSION +

", Received: " + version + " )");

}

//然后从IO流读取Op

return Op.read(in);

}

/** 接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作. */

protected finalvoid processOp(Op op) throws IOException {

switch(op) {

case READ_BLOCK:

opReadBlock();

break;

case WRITE_BLOCK:

opWriteBlock(in);

break;

case REPLACE_BLOCK:

opReplaceBlock(in);

break;

case COPY_BLOCK:

opCopyBlock(in);

break;

case BLOCK_CHECKSUM:

opBlockChecksum(in);

break;

case TRANSFER_BLOCK:

opTransferBlock(in);

break;

case REQUEST_SHORT_CIRCUIT_FDS:

opRequestShortCircuitFds(in);

break;

case RELEASE_SHORT_CIRCUIT_FDS:

opReleaseShortCircuitFds(in);

break;

case REQUEST_SHORT_CIRCUIT_SHM:

opRequestShortCircuitShm(in);

break;

default:

throw newIOException("Unknown op " +op + "in data stream");

}

}

/** Receive OP_READ_BLOCK */

private voidopReadBlock() throws IOException {

//从IO流读取序列化的readBlock参数

OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));

TraceScopetraceScope = continueTraceSpan(proto.getHeader(),

proto.getClass().getSimpleName());

try {

//反序列化参数,然后调用子类的DataXceiver的readBlock方法执行读取操作

readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),

PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),

proto.getHeader().getClientName(),

proto.getOffset(),

proto.getLen(),

proto.getSendChecksums(),

(proto.hasCachingStrategy() ?

getCachingStrategy(proto.getCachingStrategy()) :

CachingStrategy.newDefaultStrategy()));

} finally {

if (traceScope !=null) traceScope.close();

}

}

四 DataXceiverServer

我们知道在Java Socket的实现,首先需要创建一个ServerSocket对象,绑定某个指定的端口,然后通过ServerSocket.accept()方法监听是否有连接请求到达这个端口。当有socket连接请求,accept会返回一个socket对象,之后服务器就可以通过这个socket和客户端通信了。

DataNode的流式接口就参考了Socket的实现,设计了DataXceiver

以及DataXceiverServer,其中DataXceiverServer用于在DataNode上监听流式接口的请求,每当有客户端通过Sender发起流式请求时,DataXceiverServer就会监听并接受这个请求,然后创建一个DataXceiver对象用于响应这个请求并执行对应的操作

4.1DataXceiverServer初始化

在DataNode的初始化流程中,会创建一个DataXceiverServer对象监听所有的流式请求,DataNode会调用DataNode.initDataXceiver方法来完成DataXceiverServer对象的构造。

>>创建TcpPeerServer对象,这个对象封装了Socket对象。TcpPeer

Sever是通过配置项dfs.datanode.address作为监听地址的

privatevoid initDataXceiver(Configurationconf) throws IOException {

//TcpPeerServer封装了Socket,通过dfs.datanode.address监听请求

TcpPeerServertcpPeerServer;

if (secureResources !=null) {

tcpPeerServer = new TcpPeerServer(secureResources);

} else {

tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,

DataNode.getStreamingAddr(conf));

}

//设置Tcp接收缓冲区

tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);

streamingAddr = tcpPeerServer.getStreamingAddr();

LOG.info("Openedstreaming server at " +streamingAddr);

this.threadGroup =new ThreadGroup("dataXceiverServer");

//构造DataXceiverServer实例

xserver = new DataXceiverServer(tcpPeerServer, conf, this);

//将dataXceiverServer线程组设置为守护线程

this.dataXceiverServer =new Daemon(threadGroup, xserver);

this.threadGroup.setDaemon(true);// autodestroy when empty

//短路读取情况

if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,

DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||

conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,

DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {

//构造DomainPeerServer,基于DomainSocket,用于本地间进程通信

DomainPeerServerdomainPeerServer =

getDomainPeerServer(conf, streamingAddr.getPort());

if (domainPeerServer !=null) {

//构造localDataXceiverServer

this.localDataXceiverServer =new Daemon(threadGroup,

new DataXceiverServer(domainPeerServer, conf, this));

LOG.info("Listeningon UNIX domain socket: " +

domainPeerServer.getBindPath());

}

}

//构造shortCircuitRegistry对象

this.shortCircuitRegistry =new ShortCircuitRegistry(conf);

}

4.2 run方法

DataXceiverServer的功能都是在run方法中实现的,它会循环调用peerServer的accept()方法监听,如果有新的连接请求则创建Peer对象,并构造一个DataXceiver线程服务器这个流式请求,流式接口的请求则是由DataXceiver响应的,真正的操作都是DataXceiver来进行的

public voidrun() {

Peer peer = null;

//如果Data Node还在运行的话,一直循环监听是否有新的请求进来

while (datanode.shouldRun && !datanode.shutdownForUpgrade) {

try {

//如果有新的请求进来则调用peerServer的accept方法,并创建Peer对象

peer = peerServer.accept();

// Make sure the xceiver count isnot exceeded

int curXceiverCount =datanode.getXceiverCount();

if (curXceiverCount >maxXceiverCount) {

throw newIOException("Xceiver count " +curXceiverCount

+" exceeds the limit of concurrentxcievers: "

+maxXceiverCount);

}

//并构造一个DataXceiver线程服务这个流式请求,即DataXceiverServer只负责

//连接的建立和以及构造并启动DataXceiver,然后具体的事情交给DataXceiver

new Daemon(datanode.threadGroup,

DataXceiver.create(peer,datanode, this))

.start();

}catch (SocketTimeoutExceptionignored) {

// wake up to see if should continue to run

}catch (AsynchronousCloseExceptionace) {

// another thread closed our listenersocket - that's expected during shutdown,

// but not in other circumstances

if (datanode.shouldRun && !datanode.shutdownForUpgrade) {

LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ace);

}

}catch (IOException ie) {

IOUtils.cleanup(null,peer);

LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ie);

}catch (OutOfMemoryError ie) {

IOUtils.cleanup(null,peer);

// DataNode can run out of memory if thereis too many transfers.

// Log the event, Sleep for 30 seconds,other transfers may complete by

// then.

LOG.error("DataNodeis out of memory. Will retry in 30 seconds.",ie);

try {

Thread.sleep(30 *1000);

}catch (InterruptedException e) {

// ignore

}

}catch (Throwable te) {

LOG.error(datanode.getDisplayName()

+":DataXceiverServer: Exiting due to:", te);

datanode.shouldRun =false;//其他异常直接关闭Data Node

}

}

// 清理操作退出主循环,执行关闭操作,将peerServer关闭

try {

peerServer.close();

closed = true;

} catch (IOException ie) {

LOG.warn(datanode.getDisplayName()

+" :DataXceiverServer: closeexception", ie);

}

// if in restart prep stage, notify peers beforeclosing them.

if (datanode.shutdownForUpgrade) {

restartNotifyPeers();

// Each thread needs some time to process it. If a threadneeds

// to send an OOB message to the client, but blocked onnetwork for

// long time, we need to force its termination.

LOG.info("Shuttingdown DataXceiverServer before restart");

// Allow roughly up to 2 seconds.

for (inti = 0;getNumPeers() > 0 &&i < 10; i++) {

try {

Thread.sleep(200);

}catch (InterruptedException e) {

// ignore

}

}

}

//当前Server所有连接也全部关闭

closeAllPeers();

}

五 DataXceiver

DataXceiver是Receiver的子类,DataTransferPortocal真正响应操作都是在在这里完成的。

5.1run 方法

publicvoid run() {

int opsProcessed =0;

Op op = null;

try {

dataXceiverServer.addPeer(peer, Thread.currentThread(),this);

peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);

//获取底层的输入流

InputStream input = socketIn;

try {

IOStreamPairsaslStreams = datanode.saslServer.receive(peer,socketOut,

socketIn, datanode.getXferAddress().getPort(),

datanode.getDatanodeId());

//对输入流装饰

input = newBufferedInputStream(saslStreams.in,

HdfsConstants.SMALL_BUFFER_SIZE);

//获取底层的输出流

socketOut = saslStreams.out;

}catch (InvalidMagicNumberExceptionimne) {

LOG.info("Failedto read expected encryption handshake from client " +

"at " + peer.getRemoteAddressString() +". Perhaps the client " +

"is running an older version of Hadoopwhich does not support " +

"encryption");

return;

}

//调用父类的initialize方法完成初始化操作

super.initialize(newDataInputStream(input));

do {

updateCurrentThreadName("Waiting for operation #" + (opsProcessed +1));

try {

if (opsProcessed !=0) {

assert dnConf.socketKeepaliveTimeout > 0;

peer.setReadTimeout(dnConf.socketKeepaliveTimeout);

}else {

peer.setReadTimeout(dnConf.socketTimeout);

}

op = readOp();//输入流中解析Op操作符

}catch (InterruptedIOExceptionignored) {

break;

}catch (IOException err) {

// Since we optimistically expect the next op,it's quite normal to get EOF here.

if (opsProcessed >0 &&

(errinstanceof EOFException || err instanceof ClosedChannelException)) {

if (LOG.isDebugEnabled()) {

LOG.debug("Cached" +peer + " closing after " +opsProcessed + " ops");

}

}else {

incrDatanodeNetworkErrors();

throw err;

}

break;

}

// restore normal timeout

if (opsProcessed !=0) {

peer.setReadTimeout(dnConf.socketTimeout);

}

opStartTime = now();

//调用父类processOp处理流式请求

processOp(op);

++opsProcessed;

}while ((peer !=null) &&

(!peer.isClosed() &&dnConf.socketKeepaliveTimeout >0));

} catch (Throwable t) {

Strings = datanode.getDisplayName() + ":DataXceiver error processing "

+ ((op ==null) ? "unknown" : op.name()) +" operation "

+" src: " + remoteAddress +" dst: " + localAddress;

if (op == Op.WRITE_BLOCK &&t instanceofReplicaAlreadyExistsException) {

// For WRITE_BLOCK, it is okay if thereplica already exists since

// client and replication may write thesame block to the samedatanode

// at the same time.

if (LOG.isTraceEnabled()) {

LOG.trace(s,t);

}else {

LOG.info(s +"; " + t);

}

}else if (op == Op.READ_BLOCK &&t instanceofSocketTimeoutException) {

Strings1 =

"Likely the client has stoppedreading, disconnecting it";

s1 += " (" +s + ")";

if (LOG.isTraceEnabled()) {

LOG.trace(s1,t);

}else {

LOG.info(s1 +"; " + t);

}

}else {

LOG.error(s,t);

}

} finally {

if (LOG.isDebugEnabled()) {

LOG.debug(datanode.getDisplayName() +":Number of active connections is: "

+datanode.getXceiverCount());

}

updateCurrentThreadName("Cleaning up");

if (peer !=null) {

dataXceiverServer.closePeer(peer);

IOUtils.closeStream(in);

}

}

}

六 读数据

客户端调用Sender.readBlock()方法从指定DataNode上读取数据块,请求通过IO流达到DataNode之后,DataNode的DataXceiverServer会创建一个DataXceiver对象响应请求。

ReadBlock会传递几个参数过来:

ExtendedBlock?读哪一个数据块

TokenblockToken?要读取数据的访问令牌

ClientName: 哪一个客户端来读的

longblockOffset: 从数据块什么位置读

longlength: 读取数据的长度是多少

CachingStrategycachingStrategy:缓存策略

大致流程如下:

>>创建输出流

>>创建BlockSender对象

>>发送BlockOpResponseProto响应给客户端,通知客户端已经成功接收请求,并且告知客户端当前DataNode的校验信息

>>将数据块发送给客户端,并产生一个状态码

>>关闭流

public voidreadBlock(final ExtendedBlockblock,//要读取的数据块

final Token blockToken,//读取数据块的访问令牌

final String clientName,//客户端名称

final longblockOffset,//要读取的数据在数据块中的位置

final longlength,//读取数据的长度

final booleansendChecksum,//是否发送校验数据,数据块的读取校验工作是在客户端完成的,客户端会将结果返回给Data Node

final CachingStrategy cachingStrategy//缓存策略

) throws IOException {

previousOpClientName = clientName;

long read =0;

OutputStream baseStream =getOutputStream();

DataOutputStreamout = newDataOutputStream(newBufferedOutputStream(

baseStream, HdfsConstants.SMALL_BUFFER_SIZE));

checkAccess(out, true,block, blockToken,

Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);

// send the block

BlockSenderblockSender = null;

DatanodeRegistrationdnR =

datanode.getDNRegistrationForBP(block.getBlockPoolId());

final String clientTraceFmt =

clientName.length() > 0 && ClientTraceLog.isInfoEnabled()

? String.format(DN_CLIENTTRACE_FORMAT,localAddress, remoteAddress,

"%d", "HDFS_READ",clientName, "%d",

dnR.getDatanodeUuid(),block, "%d")

:dnR + " Served block " +block + "to " +

remoteAddress;

updateCurrentThreadName("Sending block " +block);

//创建BlockSender对象

try {

try {

blockSender = newBlockSender(block,blockOffset, length,

true, false,sendChecksum, datanode,clientTraceFmt,

cachingStrategy);

}catch(IOException e) {

Stringmsg = "opReadBlock " + block + "received exception " + e;

LOG.info(msg);

sendResponse(ERROR, msg);

throw e;

}

//发送BlockOpResponseProto响应给客户端,通知客户端请求已经成功接收,

//并且告知客户端当前的数据节点的校验信息

writeSuccessWithChecksumInfo(blockSender,new DataOutputStream(getOutputStream()));

long beginRead = Time.monotonicNow();

//将数据块发送给客户端,并且产生一个一个状态码,DataNode解析需要这个状态码

read = blockSender.sendBlock(out,baseStream, null);// send data

long duration = Time.monotonicNow() -beginRead;

if (blockSender.didSendEntireByteRange()) {

// If we sent the entire range, then weshould expect the client

// to respond with a Status enum.

try {

ClientReadStatusProto stat =ClientReadStatusProto.parseFrom(

PBHelper.vintPrefixed(in));

if (!stat.hasStatus()) {

LOG.warn("Client" +peer.getRemoteAddressString() +

" did not send a valid status codeafter reading. " +

"Will close connection.");

IOUtils.closeStream(out);

}

}catch (IOException ioe) {

LOG.debug("Errorreading client status response. Will close connection.",ioe);

IOUtils.closeStream(out);

incrDatanodeNetworkErrors();

}

}else {

IOUtils.closeStream(out);

}

datanode.metrics.incrBytesRead((int)read);

datanode.metrics.incrBlocksRead();

datanode.metrics.incrTotalReadTime(duration);

} catch ( SocketException ignored ) {

if (LOG.isTraceEnabled()) {

LOG.trace(dnR +":Ignoring exception while serving " + block + "to " +

remoteAddress, ignored);

}

// Its ok for remote side to close the connection anytime.

datanode.metrics.incrBlocksRead();

IOUtils.closeStream(out);

} catch ( IOException ioe ) {

/* What exactly should we do here?

* Earlier version shutdown()datanodeif there is disk error.

*/

if (!(ioeinstanceof SocketTimeoutException)) {

LOG.warn(dnR +":Got exception while serving " + block + "to "

+remoteAddress, ioe);

datanode.metrics.incrDatanodeNetworkErrors();

}

throw ioe;

} finally {

IOUtils.closeStream(blockSender);

}

//update metrics

datanode.metrics.addReadBlockOp(elapsed());

datanode.metrics.incrReadsFromClient(peer.isLocal(),read);

}

七 写数据

HDFS使用数据流管道方式来写数据,DFSClient通过调用Saver.writeBlock方法触发一个写数据块的请求,这个请求会传送数据到数据流管道中每一个DataNode,最后一个DataNode回复请求确认,这个确认消息逆向的通过数据管道流发送回DFSClient.DFSClient收到后,将要写入的数据切分成若干个数据包,然后依次向数据流管道发送这些数据包。

数据包首先会发送到第一个DataNode, 第一个DataNode成功接收数据后,会将数据包写入磁盘,然后将数据包写入第二个DataNode,

以此类推。当到达最后一个DataNode,会对数据进行校验。如果校验成功,会发送数据包确认消息,这个确认消息会逆向发送到DFSclient.当一个数据块中所有数据都发送完毕,并且收到确认消息,DFSClient会发送一个空的数据包标志当前数据块发送完毕,至此整个数据块发送流程结束。

DataXceiver.writeBlock方法:

这三个变量用于控制处理流程:

//判断是否是Data Node发起的写操作

finalboolean isDatanode =clientname.length() ==0;

//判断是否是客户端发起的写操作

finalboolean isClient = !isDatanode;

//判断是否写操作是数据复制操作

finalboolean isTransfer =stage ==BlockConstructionStage.

TRANSFER_RBW ||stage == BlockConstructionStage.TRANSFER

_FINALIZED;

……

//到下一个Data Node的输出流

DataOutputStreammirrorOut =null;

//下一个数据节点的输入流

DataInputStreammirrorIn =null;

//到下一个Data Node的socket

SocketmirrorSock =null;

//下一个节点的名称

StringmirrorNode =null;

//数据流管道中的第一个失败的Data Node

String firstBadLink ="";

……

如果是DataNode发起的写

/*

*打开一个BlockReceiver,从上游Data Node获取数据块

* BlockReceiver:负责从数据流管道的上游节点接收数据块,然后保存数据

*块,在当前节点,再将数据块转发到数据流管道中的下游节点;同时它还接

*收来自下游节点的响应,并把这个响应发送给数据流管道中的上游节点

*/

blockReceiver =new BlockReceiver(block, storageType, in,

peer.getRemoteAddressString(),

peer.getLocalAddressString(),

stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,

clientname, srcDataNode, datanode, requestedChecksum,

cachingStrategy, allowLazyPersist, pinning);

storageUuid =blockReceiver.getStorageUuid();

//连接到下游节点

if (targets.length >0) {

……

//建立到下游节点的Socket连接

mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

mirrorSock = datanode.newSocket();

try {

……

//建立到下有节点的输出流个输入流

mirrorOut = newDataOutputStream(newBufferedOutputStream(unbufMirrorOut,

HdfsConstants.SMALL_BUFFER_SIZE));

mirrorIn = newDataInputStream(unbufMirrorIn);

//向下游节点发送数据块写入请求

if (targetPinnings !=null&& targetPinnings.length > 0) {

new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],

blockToken, clientname,targets, targetStorageTypes,srcDataNode,

stage, pipelineSize,minBytesRcvd, maxBytesRcvd,

latestGenerationStamp, requestedChecksum, cachingStrategy,

false, targetPinnings[0], targetPinnings);

}else {

new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],

blockToken, clientname,targets, targetStorageTypes,srcDataNode,

stage, pipelineSize,minBytesRcvd, maxBytesRcvd,

latestGenerationStamp, requestedChecksum, cachingStrategy,

false, false,targetPinnings);

}

mirrorOut.flush();

DataNodeFaultInjector.get().writeBlockAfterFlush();

//接收来自下游节点的请求确认,并记录请求确认状态

if (isClient) {

BlockOpResponseProto connectAck =

BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));

mirrorInStatus = connectAck.getStatus();

firstBadLink = connectAck.getFirstBadLink();

if (LOG.isDebugEnabled() ||mirrorInStatus != SUCCESS){

LOG.info("Datanode" +targets.length +

" got response for connect ack " +

" from downstream datanode withfirstbadlink as " +

firstBadLink);

}

}

}catch (IOException e) {

if (isClient) {

BlockOpResponseProto.newBuilder()

.setStatus(ERROR)

// NB: Unconditionally using the xferaddr w/o hostname

.setFirstBadLink(targets[0].getXferAddr())

.build()

.writeDelimitedTo(replyOut);

replyOut.flush();

}

IOUtils.closeStream(mirrorOut);

mirrorOut = null;

IOUtils.closeStream(mirrorIn);

mirrorIn = null;

IOUtils.closeSocket(mirrorSock);

mirrorSock = null;

if (isClient) {

LOG.error(datanode +":Exception transfering block " +

block + " to mirror " +mirrorNode + ":" +e);

throw e;

}else {

LOG.info(datanode +":Exception transfering " +

block + " to mirror " +mirrorNode +

"- continuing without the mirror", e);

incrDatanodeNetworkErrors();

}

}

}

/*

*成功建立与下游节点的输入/输出流之后,writeBlock方法就

*会调用blockReceiver.receiveBlock

*方法从上游节点接收数据块,然后数据块发送到下游节点。

*同时blockReveiver对象还会从下游节点接收数据块中

*数据包的确认消,并且将这个确认消息转发到上游节点

*/

if (blockReceiver !=null) {

StringmirrorAddr = (mirrorSock ==null) ? null :mirrorNode;

//从上游节点接收数据,然后将数据发送到下游节点

blockReceiver.receiveBlock(mirrorOut,mirrorIn, replyOut,

mirrorAddr, null,targets, false);

//对于复制操作,不需要想下游节点转发数据块,也不需要接收下游节点的的确认

//所以成功接收万数据块之后,在当前节点直接返回确认消息

if (isTransfer) {

if (LOG.isTraceEnabled()) {

LOG.trace("TRANSFER:send close-ack");

}

writeResponse(SUCCESS, null,replyOut);

}

}

/*

*成功执行了receiveBlock方法之后,会更新当前数据节点上新写入的数据块副本的时间戳

*副本文件长度等信息

*如果是数据流管道关闭或者数据块复制操作,则调用closeBlock方法,向Name Node会报

* Data Node接收了新的数据块

*/

if (isClient &&

stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {

block.setGenerationStamp(latestGenerationStamp);

block.setNumBytes(minBytesRcvd);

}

if (isDatanode ||

stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {

datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT,storageUuid);

LOG.info("Received" +block + " src: " +remoteAddress + " dest: "

+localAddress + " of size " +block.getNumBytes());

}

if(isClient) {

size = block.getNumBytes();

}

receiveBlock操作:

//启动PacketResponder线程处理确认包的接收和转发

if (isClient && !isTransfer) {

responder = new Daemon(datanode.threadGroup,

new PacketResponder(replyOut,mirrIn, downstreams));

responder.start();

}

//循环调用receivePacket接收并转发数据块中的所有数据包

while (receivePacket() >=0) { /* Receive until the last packet */ }

if (responder !=null) {

//完成数据块的写入操作后,结束PacketResponder线程

((PacketResponder)responder.getRunnable()).close();

responderClosed = true;

}

}

/**

*会从上游节点或者客户端接受数据,接受完数据之后,会做以下操作

* 1获取下一个数据包的header信息

* 2如果不是最后一个数据包,则立即处理响应

* 3向下游Data Node发送数据包

* 4如果接收了完整的数据包,并且syncBlock为true,则自己将数据同步到本地磁盘

* 5如果是最后一个节点,需要进行checksum验证

*/

private intreceivePacket() throwsIOException {

//读取下一个数据包

packetReceiver.receiveNextPacket(in);

/*

*获取数据包的header,并取得相对应该数据包在block的位置,以及

*判断是否是最后一个数据包,是否需要同步到磁盘

*/

PacketHeaderheader = packetReceiver.getHeader();

if (LOG.isDebugEnabled()){

LOG.debug("Receivingone packet for block " +block +

": " + header);

}

//获取数据块的offset

long offsetInBlock =header.getOffsetInBlock();

long seqno =header.getSeqno();

//是否是block最后一个数据包

boolean lastPacketInBlock =header.isLastPacketInBlock();

//数据包header的长度

final int len = header.getDataLen();

boolean syncBlock =header.getSyncBlock();

// avoid double sync'ing on close

if (syncBlock &&lastPacketInBlock) {

this.syncOnClose =false;

}

// update received bytes

final long firstByteInBlock = offsetInBlock;

offsetInBlock += len;

if (replicaInfo.getNumBytes() )>

replicaInfo.setNumBytes(offsetInBlock);

}

//如果不是数据流管道中最后一个数据节点,立即处理响应

if (responder !=null && !syncBlock && !shouldVerifyChecksum()) {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock, Status.SUCCESS);

}

//如果下游节点不为空,则向下游Data Node发送数据包

if (mirrorOut !=null && !mirrorError) {

try {

long begin = Time.monotonicNow();

packetReceiver.mirrorPacketTo(mirrorOut);

mirrorOut.flush();

long duration = Time.monotonicNow() -begin;

}catch (IOException e) {

handleMirrorOutError(e);

}

}

ByteBuffer dataBuf = packetReceiver.getDataSlice();

ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();

if (lastPacketInBlock ||len == 0) {

if(LOG.isDebugEnabled()) {

LOG.debug("Receivingan empty packet or the end of the block " +block);

}

//如果接收了完整的数据块,并且syncBlock为true,则;自己将数据同步到磁盘

if (syncBlock) {

flushOrSync(true);

}

} else {

//如果当前节点是数据流最后一个节点,则验证数据包的checksum

final int checksumLen = diskChecksum.getChecksumSize(len);

final int checksumReceivedLen = checksumBuf.capacity();

if (checksumReceivedLen >0 && checksumReceivedLen !=checksumLen) {

throw newIOException("Invalid checksum length: received length is "

+checksumReceivedLen + " but expected length is " + checksumLen);

}

if (checksumReceivedLen >0 && shouldVerifyChecksum()) {

try {

//验证数据包的checksum

verifyChunks(dataBuf,checksumBuf);

}catch (IOException ioe) {

// checksum error detected locally. thereis no reason to continue.

if (responder !=null) {

try {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock,

Status.ERROR_CHECKSUM);

// Wait until the responder sends back theresponse

// and interrupt this thread.

Thread.sleep(3000);

}catch (InterruptedException e) { }

}

throw newIOException("Terminating due to a checksum error." +ioe);

}

}

if (checksumReceivedLen ==0 && !streams.isTransientStorage()) {

// checksum is missing, need to calculateit

checksumBuf = ByteBuffer.allocate(checksumLen);

diskChecksum.calculateChunkedSums(dataBuf,checksumBuf);

}

// by this point, the data in the buffer uses the diskchecksum

final booleanshouldNotWriteChecksum = checksumReceivedLen == 0

&&streams.isTransientStorage();

try {

long onDiskLen =replicaInfo.getBytesOnDisk();

if (onDiskLen)>

//finally write to the disk :

if (onDiskLen %bytesPerChecksum != 0) {

// prepare to overwrite last checksum

adjustCrcFilePosition();

}

// If this is a partial chunk, then read inpre-existing checksum

Checksum partialCrc =null;

if (!shouldNotWriteChecksum &&firstByteInBlock % bytesPerChecksum != 0) {

if (LOG.isDebugEnabled()) {

LOG.debug("receivePacketfor " +block

+": bytesPerChecksum=" + bytesPerChecksum

+" does not dividefirstByteInBlock=" + firstByteInBlock);

}

long offsetInChecksum = BlockMetadataHeader.getHeaderSize()+

onDiskLen / bytesPerChecksum *checksumSize;

partialCrc = computePartialChunkCrc(onDiskLen,offsetInChecksum);

}

int startByteToDisk = (int)(onDiskLen-firstByteInBlock)

+dataBuf.arrayOffset() +dataBuf.position();

int numBytesToDisk = (int)(offsetInBlock-onDiskLen);

//

long begin = Time.monotonicNow();

//写入数据和checksum

out.write(dataBuf.array(),startByteToDisk, numBytesToDisk);

long duration = Time.monotonicNow() -begin;

if (duration >datanodeSlowLogThresholdMs) {

LOG.warn("SlowBlockReceiver write data to disk cost:" +duration

+"ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");

}

final byte[]lastCrc;

if (shouldNotWriteChecksum) {

lastCrc = null;

}else if (partialCrc !=null) {

// If this is a partial chunk, then verifythat this is the only

// chunk in the packet. Calculate new crcfor this chunk.

if (len >bytesPerChecksum) {

throw newIOException("Unexpected packet data length for "

+block + "from " +inAddr + ": a partial chunk must be "

+" sent in an individual packet (datalength = " +len

+" > bytesPerChecksum = " + bytesPerChecksum + ")");

}

partialCrc.update(dataBuf.array(),startByteToDisk, numBytesToDisk);

byte[] buf =FSOutputSummer.convertToByteStream(partialCrc,checksumSize);

lastCrc = copyLastChunkChecksum(buf,checksumSize, buf.length);

checksumOut.write(buf);

if(LOG.isDebugEnabled()) {

LOG.debug("Writingout partial crc for data len " +len);

}

partialCrc = null;

}else {

// write checksum

final intoffset = checksumBuf.arrayOffset() +

checksumBuf.position();

final intend = offset +checksumLen;

lastCrc = copyLastChunkChecksum(checksumBuf.array(),checksumSize,

end);

checksumOut.write(checksumBuf.array(),offset, checksumLen);

}

/// flush entire packet, sync if requested

flushOrSync(syncBlock);

replicaInfo.setLastChecksumAndDataLen(offsetInBlock,lastCrc);

datanode.metrics.incrBytesWritten(len);

datanode.metrics.incrTotalWriteTime(duration);

manageWriterOsCache(offsetInBlock);

}

}catch (IOException iex) {

datanode.checkDiskErrorAsync();

throw iex;

}

}

// if sync was requested, put in queue for pending ackshere

// (after the fsync finished)

if (responder !=null && (syncBlock ||shouldVerifyChecksum())) {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock, Status.SUCCESS);

}

/*

* Send in-progress responses for thereplaceBlock() calls back to caller to

* avoid timeouts due to balancerthrottling. HDFS-6247

*/

if (isReplaceBlock

&& (Time.monotonicNow()-lastResponseTime > responseInterval)) {

BlockOpResponseProto.Builder response =BlockOpResponseProto.newBuilder()

.setStatus(Status.IN_PROGRESS);

response.build().writeDelimitedTo(replyOut);

replyOut.flush();

lastResponseTime = Time.monotonicNow();

}

if (throttler !=null) { // throttle I/O

throttler.throttle(len);

}

return lastPacketInBlock?-1:len;

}