DataNode的流式接口讲解
DataNode最重要的功能就是管理物理存储上的数据块,并与NameNode和客户端通信执行读写数据块的操作。这里的读写涉及到大量的数据传输,例如DFSClient将数据块写入DataNode, DFSClient从DataNode读取数据,以及将DataNode数据块复制到其他数据节点。这些操作都涉及大量的I/O
在DataNode实现中,对这些读写操作提供了基于TCP流的数据访问接口DataTransferProtocal。
一 DataTransferProtocal
DataTransferProtocal是用来描述从DataNode读或者写数据的另一个接口,方法有:
readBlock:从当前DataNode读取数据块
writeBlock:写数据块到当前DataNode
transferBlock:传输数据块
releaseShortCirciutFds
requestShortCirciutShm
copyBlock:复制数据块
blockChecksum:获取指定数据块的校验值
DataTransferProtocal有2个子类Sender和Receiver。
Sender封装了DataTransferProtocal的调用操作,用于发起流式接口的请求。
Receiver封装了DataTransferProtocal的执行操作,用于响应流式接口的请求
假设DFSClient发起一个DataTransferProtocal.readBlock操作,那么DFSClient就会调用Sender类将这个请求序列化,并且传输给远程的Receiver。远程的Receiver类接受到这个请求之后,会反序列化请求,然后调用执行代码执行读取操作
二 Sender
Sender首先使用ProtoBuf序列化参数,然后用一个枚举类Op描述调用的是什么方法,最后将序列化的参数和Op一起发送给接收方 。
@Override
public voidreadBlock(final ExtendedBlockblk,
final Token
final String clientName,
final longblockOffset,
final longlength,
final booleansendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
//将所有参数使用ProtoBuf序列化
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk,clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
//调用send方法发送Op.READ_BLOCK描述当前使用的是readBlock方法,同时发送序列化后的擦数proto
send(out, Op.READ_BLOCK,proto);
}
privatestatic voidsend(final DataOutputStreamout, final Opopcode,
final Messageproto) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("SendingDataTransferOp " +proto.getClass().getSimpleName()
+": " + proto);
}
//调用op方法写入版本号和操作码
op(out, opcode);
//写入序列化后的参数
proto.writeDelimitedTo(out);
out.flush();
}
当我们调用Sender发起一个readBlock()请求时,Sender会将readBlock请求使用PrtotoBuf序列化,然后通过I/O流发送到远程的DataNode.
DataNode读取到这个请求,会调用Receiver类的对应的readBlock方法opReadBlock方法反序列化请求,然后执行readBlock操作
三 Receiver
Receiver是一个抽象类,用于执行远程节点发起的流逝接口请求,他提供了解析Sender请求的操作码的readOp方法以及处理请求processOp方法,processOp接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作,针对不同的操作码执行不同的op操作码函数,比如是readBlock,这里就会调用opReadBlock方法,然后从IO流读取序列化的请求参数,并进行反序列化,然后调用子类DataXceiver的对应方法进行执行。
/** 从IO流读取版本号和Op操作码*/
protected final OpreadOp() throws IOException {
//从IO流读取版本号
final shortversion = in.readShort();
if (version !=DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw newIOException( "Version Mismatch (Expected: " +
DataTransferProtocol.DATA_TRANSFER_VERSION +
", Received: " + version + " )");
}
//然后从IO流读取Op
return Op.read(in);
}
/** 接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作. */
protected finalvoid processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw newIOException("Unknown op " +op + "in data stream");
}
}
/** Receive OP_READ_BLOCK */
private voidopReadBlock() throws IOException {
//从IO流读取序列化的readBlock参数
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
TraceScopetraceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
//反序列化参数,然后调用子类的DataXceiver的readBlock方法执行读取操作
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
proto.getSendChecksums(),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope !=null) traceScope.close();
}
}
四 DataXceiverServer
我们知道在Java Socket的实现,首先需要创建一个ServerSocket对象,绑定某个指定的端口,然后通过ServerSocket.accept()方法监听是否有连接请求到达这个端口。当有socket连接请求,accept会返回一个socket对象,之后服务器就可以通过这个socket和客户端通信了。
DataNode的流式接口就参考了Socket的实现,设计了DataXceiver
以及DataXceiverServer,其中DataXceiverServer用于在DataNode上监听流式接口的请求,每当有客户端通过Sender发起流式请求时,DataXceiverServer就会监听并接受这个请求,然后创建一个DataXceiver对象用于响应这个请求并执行对应的操作
4.1DataXceiverServer初始化
在DataNode的初始化流程中,会创建一个DataXceiverServer对象监听所有的流式请求,DataNode会调用DataNode.initDataXceiver方法来完成DataXceiverServer对象的构造。
>>创建TcpPeerServer对象,这个对象封装了Socket对象。TcpPeer
Sever是通过配置项dfs.datanode.address作为监听地址的
privatevoid initDataXceiver(Configurationconf) throws IOException {
//TcpPeerServer封装了Socket,通过dfs.datanode.address监听请求
TcpPeerServertcpPeerServer;
if (secureResources !=null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
}
//设置Tcp接收缓冲区
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Openedstreaming server at " +streamingAddr);
this.threadGroup =new ThreadGroup("dataXceiverServer");
//构造DataXceiverServer实例
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
//将dataXceiverServer线程组设置为守护线程
this.dataXceiverServer =new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true);// autodestroy when empty
//短路读取情况
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
//构造DomainPeerServer,基于DomainSocket,用于本地间进程通信
DomainPeerServerdomainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer !=null) {
//构造localDataXceiverServer
this.localDataXceiverServer =new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listeningon UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
//构造shortCircuitRegistry对象
this.shortCircuitRegistry =new ShortCircuitRegistry(conf);
}
4.2 run方法
DataXceiverServer的功能都是在run方法中实现的,它会循环调用peerServer的accept()方法监听,如果有新的连接请求则创建Peer对象,并构造一个DataXceiver线程服务器这个流式请求,流式接口的请求则是由DataXceiver响应的,真正的操作都是DataXceiver来进行的
public voidrun() {
Peer peer = null;
//如果Data Node还在运行的话,一直循环监听是否有新的请求进来
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
//如果有新的请求进来则调用peerServer的accept方法,并创建Peer对象
peer = peerServer.accept();
// Make sure the xceiver count isnot exceeded
int curXceiverCount =datanode.getXceiverCount();
if (curXceiverCount >maxXceiverCount) {
throw newIOException("Xceiver count " +curXceiverCount
+" exceeds the limit of concurrentxcievers: "
+maxXceiverCount);
}
//并构造一个DataXceiver线程服务这个流式请求,即DataXceiverServer只负责
//连接的建立和以及构造并启动DataXceiver,然后具体的事情交给DataXceiver
new Daemon(datanode.threadGroup,
DataXceiver.create(peer,datanode, this))
.start();
}catch (SocketTimeoutExceptionignored) {
// wake up to see if should continue to run
}catch (AsynchronousCloseExceptionace) {
// another thread closed our listenersocket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ace);
}
}catch (IOException ie) {
IOUtils.cleanup(null,peer);
LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ie);
}catch (OutOfMemoryError ie) {
IOUtils.cleanup(null,peer);
// DataNode can run out of memory if thereis too many transfers.
// Log the event, Sleep for 30 seconds,other transfers may complete by
// then.
LOG.error("DataNodeis out of memory. Will retry in 30 seconds.",ie);
try {
Thread.sleep(30 *1000);
}catch (InterruptedException e) {
// ignore
}
}catch (Throwable te) {
LOG.error(datanode.getDisplayName()
+":DataXceiverServer: Exiting due to:", te);
datanode.shouldRun =false;//其他异常直接关闭Data Node
}
}
// 清理操作退出主循环,执行关闭操作,将peerServer关闭
try {
peerServer.close();
closed = true;
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName()
+" :DataXceiverServer: closeexception", ie);
}
// if in restart prep stage, notify peers beforeclosing them.
if (datanode.shutdownForUpgrade) {
restartNotifyPeers();
// Each thread needs some time to process it. If a threadneeds
// to send an OOB message to the client, but blocked onnetwork for
// long time, we need to force its termination.
LOG.info("Shuttingdown DataXceiverServer before restart");
// Allow roughly up to 2 seconds.
for (inti = 0;getNumPeers() > 0 &&i < 10; i++) {
try {
Thread.sleep(200);
}catch (InterruptedException e) {
// ignore
}
}
}
//当前Server所有连接也全部关闭
closeAllPeers();
}
五 DataXceiver
DataXceiver是Receiver的子类,DataTransferPortocal真正响应操作都是在在这里完成的。
5.1run 方法
publicvoid run() {
int opsProcessed =0;
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread(),this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
//获取底层的输入流
InputStream input = socketIn;
try {
IOStreamPairsaslStreams = datanode.saslServer.receive(peer,socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
//对输入流装饰
input = newBufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
//获取底层的输出流
socketOut = saslStreams.out;
}catch (InvalidMagicNumberExceptionimne) {
LOG.info("Failedto read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() +". Perhaps the client " +
"is running an older version of Hadoopwhich does not support " +
"encryption");
return;
}
//调用父类的initialize方法完成初始化操作
super.initialize(newDataInputStream(input));
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed +1));
try {
if (opsProcessed !=0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
}else {
peer.setReadTimeout(dnConf.socketTimeout);
}
op = readOp();//输入流中解析Op操作符
}catch (InterruptedIOExceptionignored) {
break;
}catch (IOException err) {
// Since we optimistically expect the next op,it's quite normal to get EOF here.
if (opsProcessed >0 &&
(errinstanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached" +peer + " closing after " +opsProcessed + " ops");
}
}else {
incrDatanodeNetworkErrors();
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed !=0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = now();
//调用父类processOp处理流式请求
processOp(op);
++opsProcessed;
}while ((peer !=null) &&
(!peer.isClosed() &&dnConf.socketKeepaliveTimeout >0));
} catch (Throwable t) {
Strings = datanode.getDisplayName() + ":DataXceiver error processing "
+ ((op ==null) ? "unknown" : op.name()) +" operation "
+" src: " + remoteAddress +" dst: " + localAddress;
if (op == Op.WRITE_BLOCK &&t instanceofReplicaAlreadyExistsException) {
// For WRITE_BLOCK, it is okay if thereplica already exists since
// client and replication may write thesame block to the samedatanode
// at the same time.
if (LOG.isTraceEnabled()) {
LOG.trace(s,t);
}else {
LOG.info(s +"; " + t);
}
}else if (op == Op.READ_BLOCK &&t instanceofSocketTimeoutException) {
Strings1 =
"Likely the client has stoppedreading, disconnecting it";
s1 += " (" +s + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(s1,t);
}else {
LOG.info(s1 +"; " + t);
}
}else {
LOG.error(s,t);
}
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() +":Number of active connections is: "
+datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
if (peer !=null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
六 读数据
客户端调用Sender.readBlock()方法从指定DataNode上读取数据块,请求通过IO流达到DataNode之后,DataNode的DataXceiverServer会创建一个DataXceiver对象响应请求。
ReadBlock会传递几个参数过来:
ExtendedBlock?读哪一个数据块
Token
ClientName: 哪一个客户端来读的
longblockOffset: 从数据块什么位置读
longlength: 读取数据的长度是多少
CachingStrategycachingStrategy:缓存策略
大致流程如下:
>>创建输出流
>>创建BlockSender对象
>>发送BlockOpResponseProto响应给客户端,通知客户端已经成功接收请求,并且告知客户端当前DataNode的校验信息
>>将数据块发送给客户端,并产生一个状态码
>>关闭流
public voidreadBlock(final ExtendedBlockblock,//要读取的数据块
final Token
final String clientName,//客户端名称
final longblockOffset,//要读取的数据在数据块中的位置
final longlength,//读取数据的长度
final booleansendChecksum,//是否发送校验数据,数据块的读取校验工作是在客户端完成的,客户端会将结果返回给Data Node
final CachingStrategy cachingStrategy//缓存策略
) throws IOException {
previousOpClientName = clientName;
long read =0;
OutputStream baseStream =getOutputStream();
DataOutputStreamout = newDataOutputStream(newBufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(out, true,block, blockToken,
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSenderblockSender = null;
DatanodeRegistrationdnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT,localAddress, remoteAddress,
"%d", "HDFS_READ",clientName, "%d",
dnR.getDatanodeUuid(),block, "%d")
:dnR + " Served block " +block + "to " +
remoteAddress;
updateCurrentThreadName("Sending block " +block);
//创建BlockSender对象
try {
try {
blockSender = newBlockSender(block,blockOffset, length,
true, false,sendChecksum, datanode,clientTraceFmt,
cachingStrategy);
}catch(IOException e) {
Stringmsg = "opReadBlock " + block + "received exception " + e;
LOG.info(msg);
sendResponse(ERROR, msg);
throw e;
}
//发送BlockOpResponseProto响应给客户端,通知客户端请求已经成功接收,
//并且告知客户端当前的数据节点的校验信息
writeSuccessWithChecksumInfo(blockSender,new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow();
//将数据块发送给客户端,并且产生一个一个状态码,DataNode解析需要这个状态码
read = blockSender.sendBlock(out,baseStream, null);// send data
long duration = Time.monotonicNow() -beginRead;
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then weshould expect the client
// to respond with a Status enum.
try {
ClientReadStatusProto stat =ClientReadStatusProto.parseFrom(
PBHelper.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client" +peer.getRemoteAddressString() +
" did not send a valid status codeafter reading. " +
"Will close connection.");
IOUtils.closeStream(out);
}
}catch (IOException ioe) {
LOG.debug("Errorreading client status response. Will close connection.",ioe);
IOUtils.closeStream(out);
incrDatanodeNetworkErrors();
}
}else {
IOUtils.closeStream(out);
}
datanode.metrics.incrBytesRead((int)read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
} catch ( SocketException ignored ) {
if (LOG.isTraceEnabled()) {
LOG.trace(dnR +":Ignoring exception while serving " + block + "to " +
remoteAddress, ignored);
}
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
IOUtils.closeStream(out);
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown()datanodeif there is disk error.
*/
if (!(ioeinstanceof SocketTimeoutException)) {
LOG.warn(dnR +":Got exception while serving " + block + "to "
+remoteAddress, ioe);
datanode.metrics.incrDatanodeNetworkErrors();
}
throw ioe;
} finally {
IOUtils.closeStream(blockSender);
}
//update metrics
datanode.metrics.addReadBlockOp(elapsed());
datanode.metrics.incrReadsFromClient(peer.isLocal(),read);
}
七 写数据
HDFS使用数据流管道方式来写数据,DFSClient通过调用Saver.writeBlock方法触发一个写数据块的请求,这个请求会传送数据到数据流管道中每一个DataNode,最后一个DataNode回复请求确认,这个确认消息逆向的通过数据管道流发送回DFSClient.DFSClient收到后,将要写入的数据切分成若干个数据包,然后依次向数据流管道发送这些数据包。
数据包首先会发送到第一个DataNode, 第一个DataNode成功接收数据后,会将数据包写入磁盘,然后将数据包写入第二个DataNode,
以此类推。当到达最后一个DataNode,会对数据进行校验。如果校验成功,会发送数据包确认消息,这个确认消息会逆向发送到DFSclient.当一个数据块中所有数据都发送完毕,并且收到确认消息,DFSClient会发送一个空的数据包标志当前数据块发送完毕,至此整个数据块发送流程结束。
DataXceiver.writeBlock方法:
这三个变量用于控制处理流程:
//判断是否是Data Node发起的写操作
finalboolean isDatanode =clientname.length() ==0;
//判断是否是客户端发起的写操作
finalboolean isClient = !isDatanode;
//判断是否写操作是数据复制操作
finalboolean isTransfer =stage ==BlockConstructionStage.
TRANSFER_RBW ||stage == BlockConstructionStage.TRANSFER
_FINALIZED;
……
//到下一个Data Node的输出流
DataOutputStreammirrorOut =null;
//下一个数据节点的输入流
DataInputStreammirrorIn =null;
//到下一个Data Node的socket
SocketmirrorSock =null;
//下一个节点的名称
StringmirrorNode =null;
//数据流管道中的第一个失败的Data Node
String firstBadLink ="";
……
如果是DataNode发起的写
/*
*打开一个BlockReceiver,从上游Data Node获取数据块
* BlockReceiver:负责从数据流管道的上游节点接收数据块,然后保存数据
*块,在当前节点,再将数据块转发到数据流管道中的下游节点;同时它还接
*收来自下游节点的响应,并把这个响应发送给数据流管道中的上游节点
*/
blockReceiver =new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
storageUuid =blockReceiver.getStorageUuid();
//连接到下游节点
if (targets.length >0) {
……
//建立到下游节点的Socket连接
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
……
//建立到下有节点的输出流个输入流
mirrorOut = newDataOutputStream(newBufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = newDataInputStream(unbufMirrorIn);
//向下游节点发送数据块写入请求
if (targetPinnings !=null&& targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],
blockToken, clientname,targets, targetStorageTypes,srcDataNode,
stage, pipelineSize,minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, targetPinnings[0], targetPinnings);
}else {
new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],
blockToken, clientname,targets, targetStorageTypes,srcDataNode,
stage, pipelineSize,minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false,targetPinnings);
}
mirrorOut.flush();
DataNodeFaultInjector.get().writeBlockAfterFlush();
//接收来自下游节点的请求确认,并记录请求确认状态
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() ||mirrorInStatus != SUCCESS){
LOG.info("Datanode" +targets.length +
" got response for connect ack " +
" from downstream datanode withfirstbadlink as " +
firstBadLink);
}
}
}catch (IOException e) {
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
// NB: Unconditionally using the xferaddr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error(datanode +":Exception transfering block " +
block + " to mirror " +mirrorNode + ":" +e);
throw e;
}else {
LOG.info(datanode +":Exception transfering " +
block + " to mirror " +mirrorNode +
"- continuing without the mirror", e);
incrDatanodeNetworkErrors();
}
}
}
/*
*成功建立与下游节点的输入/输出流之后,writeBlock方法就
*会调用blockReceiver.receiveBlock
*方法从上游节点接收数据块,然后数据块发送到下游节点。
*同时blockReveiver对象还会从下游节点接收数据块中
*数据包的确认消,并且将这个确认消息转发到上游节点
*/
if (blockReceiver !=null) {
StringmirrorAddr = (mirrorSock ==null) ? null :mirrorNode;
//从上游节点接收数据,然后将数据发送到下游节点
blockReceiver.receiveBlock(mirrorOut,mirrorIn, replyOut,
mirrorAddr, null,targets, false);
//对于复制操作,不需要想下游节点转发数据块,也不需要接收下游节点的的确认
//所以成功接收万数据块之后,在当前节点直接返回确认消息
if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER:send close-ack");
}
writeResponse(SUCCESS, null,replyOut);
}
}
/*
*成功执行了receiveBlock方法之后,会更新当前数据节点上新写入的数据块副本的时间戳
*副本文件长度等信息
*如果是数据流管道关闭或者数据块复制操作,则调用closeBlock方法,向Name Node会报
* Data Node接收了新的数据块
*/
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT,storageUuid);
LOG.info("Received" +block + " src: " +remoteAddress + " dest: "
+localAddress + " of size " +block.getNumBytes());
}
if(isClient) {
size = block.getNumBytes();
}
receiveBlock操作:
//启动PacketResponder线程处理确认包的接收和转发
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut,mirrIn, downstreams));
responder.start();
}
//循环调用receivePacket接收并转发数据块中的所有数据包
while (receivePacket() >=0) { /* Receive until the last packet */ }
if (responder !=null) {
//完成数据块的写入操作后,结束PacketResponder线程
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
}
/**
*会从上游节点或者客户端接受数据,接受完数据之后,会做以下操作
* 1获取下一个数据包的header信息
* 2如果不是最后一个数据包,则立即处理响应
* 3向下游Data Node发送数据包
* 4如果接收了完整的数据包,并且syncBlock为true,则自己将数据同步到本地磁盘
* 5如果是最后一个节点,需要进行checksum验证
*/
private intreceivePacket() throwsIOException {
//读取下一个数据包
packetReceiver.receiveNextPacket(in);
/*
*获取数据包的header,并取得相对应该数据包在block的位置,以及
*判断是否是最后一个数据包,是否需要同步到磁盘
*/
PacketHeaderheader = packetReceiver.getHeader();
if (LOG.isDebugEnabled()){
LOG.debug("Receivingone packet for block " +block +
": " + header);
}
//获取数据块的offset
long offsetInBlock =header.getOffsetInBlock();
long seqno =header.getSeqno();
//是否是block最后一个数据包
boolean lastPacketInBlock =header.isLastPacketInBlock();
//数据包header的长度
final int len = header.getDataLen();
boolean syncBlock =header.getSyncBlock();
// avoid double sync'ing on close
if (syncBlock &&lastPacketInBlock) {
this.syncOnClose =false;
}
// update received bytes
final long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes()
replicaInfo.setNumBytes(offsetInBlock);
}
//如果不是数据流管道中最后一个数据节点,立即处理响应
if (responder !=null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
//如果下游节点不为空,则向下游Data Node发送数据包
if (mirrorOut !=null && !mirrorError) {
try {
long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long duration = Time.monotonicNow() -begin;
}catch (IOException e) {
handleMirrorOutError(e);
}
}
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock ||len == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("Receivingan empty packet or the end of the block " +block);
}
//如果接收了完整的数据块,并且syncBlock为true,则;自己将数据同步到磁盘
if (syncBlock) {
flushOrSync(true);
}
} else {
//如果当前节点是数据流最后一个节点,则验证数据包的checksum
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
if (checksumReceivedLen >0 && checksumReceivedLen !=checksumLen) {
throw newIOException("Invalid checksum length: received length is "
+checksumReceivedLen + " but expected length is " + checksumLen);
}
if (checksumReceivedLen >0 && shouldVerifyChecksum()) {
try {
//验证数据包的checksum
verifyChunks(dataBuf,checksumBuf);
}catch (IOException ioe) {
// checksum error detected locally. thereis no reason to continue.
if (responder !=null) {
try {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock,
Status.ERROR_CHECKSUM);
// Wait until the responder sends back theresponse
// and interrupt this thread.
Thread.sleep(3000);
}catch (InterruptedException e) { }
}
throw newIOException("Terminating due to a checksum error." +ioe);
}
}
if (checksumReceivedLen ==0 && !streams.isTransientStorage()) {
// checksum is missing, need to calculateit
checksumBuf = ByteBuffer.allocate(checksumLen);
diskChecksum.calculateChunkedSums(dataBuf,checksumBuf);
}
// by this point, the data in the buffer uses the diskchecksum
final booleanshouldNotWriteChecksum = checksumReceivedLen == 0
&&streams.isTransientStorage();
try {
long onDiskLen =replicaInfo.getBytesOnDisk();
if (onDiskLen
//finally write to the disk :
if (onDiskLen %bytesPerChecksum != 0) {
// prepare to overwrite last checksum
adjustCrcFilePosition();
}
// If this is a partial chunk, then read inpre-existing checksum
Checksum partialCrc =null;
if (!shouldNotWriteChecksum &&firstByteInBlock % bytesPerChecksum != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("receivePacketfor " +block
+": bytesPerChecksum=" + bytesPerChecksum
+" does not dividefirstByteInBlock=" + firstByteInBlock);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize()+
onDiskLen / bytesPerChecksum *checksumSize;
partialCrc = computePartialChunkCrc(onDiskLen,offsetInChecksum);
}
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+dataBuf.arrayOffset() +dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
//
long begin = Time.monotonicNow();
//写入数据和checksum
out.write(dataBuf.array(),startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() -begin;
if (duration >datanodeSlowLogThresholdMs) {
LOG.warn("SlowBlockReceiver write data to disk cost:" +duration
+"ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
final byte[]lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
}else if (partialCrc !=null) {
// If this is a partial chunk, then verifythat this is the only
// chunk in the packet. Calculate new crcfor this chunk.
if (len >bytesPerChecksum) {
throw newIOException("Unexpected packet data length for "
+block + "from " +inAddr + ": a partial chunk must be "
+" sent in an individual packet (datalength = " +len
+" > bytesPerChecksum = " + bytesPerChecksum + ")");
}
partialCrc.update(dataBuf.array(),startByteToDisk, numBytesToDisk);
byte[] buf =FSOutputSummer.convertToByteStream(partialCrc,checksumSize);
lastCrc = copyLastChunkChecksum(buf,checksumSize, buf.length);
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writingout partial crc for data len " +len);
}
partialCrc = null;
}else {
// write checksum
final intoffset = checksumBuf.arrayOffset() +
checksumBuf.position();
final intend = offset +checksumLen;
lastCrc = copyLastChunkChecksum(checksumBuf.array(),checksumSize,
end);
checksumOut.write(checksumBuf.array(),offset, checksumLen);
}
/// flush entire packet, sync if requested
flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen(offsetInBlock,lastCrc);
datanode.metrics.incrBytesWritten(len);
datanode.metrics.incrTotalWriteTime(duration);
manageWriterOsCache(offsetInBlock);
}
}catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
// if sync was requested, put in queue for pending ackshere
// (after the fsync finished)
if (responder !=null && (syncBlock ||shouldVerifyChecksum())) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
/*
* Send in-progress responses for thereplaceBlock() calls back to caller to
* avoid timeouts due to balancerthrottling. HDFS-6247
*/
if (isReplaceBlock
&& (Time.monotonicNow()-lastResponseTime > responseInterval)) {
BlockOpResponseProto.Builder response =BlockOpResponseProto.newBuilder()
.setStatus(Status.IN_PROGRESS);
response.build().writeDelimitedTo(replyOut);
replyOut.flush();
lastResponseTime = Time.monotonicNow();
}
if (throttler !=null) { // throttle I/O
throttler.throttle(len);
}
return lastPacketInBlock?-1:len;
}