DataNode DataXceiverServer readBlock详解
鉴于在hdfs客户端读取hdfs文件过程中,其在获取到数据块所在的DataNode之后,会构造blockReader对象用来从指定数据节点上读取数据块;其中RemoteBlockReader2是使用socket连接从datanode读取数据块的实现类,其reader.read()方法用于从socket stream中读取对应的数据包。接下来将分析一下该处对应的DataNode节点上是如何响应该请求的:
在介绍如何响应之前,先简单介绍一下DataNode进程在启动中所开启的一些基本服务如下(其启动源码在DataNode.main()方法中,不过多赘述,后续会写一篇blog来介绍具体的启动过程):
- DataNode.startDataNode():
- 初始化DataStorage对象
- 初始化DataXceiverServer对象
- 启动HttpInfoServer对象
- 初始化DataNode的IPC Server对象
- 创建BlockPoolManager对象
- DataNode.runDatanodeDaemon():
- 启动BlockPoolManager所管理的所有线程
- 启动dataXceiverServer线程
- 启动DataNode的IPC Server
其中主要用于响应客户端流式接口请求的服务就是DataXceiverServer服务线程;其基本逻辑接口图如下:
接下来一步步来看DataXceiverServer是如何启动并对外提供服务的:
1、DataXceiverServer的初始化
在数据节点DataNode进程启动的startDataNode()方法中,会调用initDataXceiver()方法,完成DataXceiverServer的初始化;首先其会创建tcpPeerServer对象(对ServerSocket的封装),其能够通过accept()方法返回Peer对象(对Socket的封装)用于提供输入输出流。并针对短路读提供domainPeerServer对于本地短路读请求;其源码如下:
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
// 对ServerSocket的封装
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
}
// .........
// 设置tcp接受缓冲区 并绑定对应InetSocketAddress
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
// 构造DataXceiverServer对象
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
// 针对短路读情况,构造localDataXceiverServer 用于本地进程通信
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
之后其便会通过DataNode.runDatanodeDaemon();启动dataXceiverServer线程;其run()方法的基本逻辑和源码如下:
- while循环,等待阻塞TcpPeerServer(也就是ServerSocket)的accept()方法,直到接收到客户端或者其他DataNode的连接请求;
- 获得peer,即Socket的封装;
- 判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续4;
- 创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求;
通过该方法可以知道dataXceiverServer只负责连接的建立以及构造并启动DataXceiver,流式接口的请求则是由DataXceiver响应的,所有的输入输出流的操作都是由DataXceiver来执行的;(这种连接建立和响应分离的设计方式在hadoop rpc处也出现过)
@Override
// 核心方法
public void run() {
Peer peer = null;
// 如果标志位shouldRun为true,且没有为升级而执行shutdown
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
// 阻塞,直到接收到客户端或者其他DataNode的连接请求
peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
// 确保DataXceiver数目没有超过最大限制
/**
* DataNode的getXceiverCount方法计算得到,返回线程组的活跃线程数目
* threadGroup == null ? 0 : threadGroup.activeCount();
*/
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
// 创建一个后台线程,DataXceiver,并加入到线程组datanode.threadGroup
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
// 等待唤醒看看是否能够继续运行
} catch (AsynchronousCloseException ace) {// 异步的关闭异常
// 正如我们所预料的,只有在关机的过程中,通过其他线程关闭我们的侦听套接字,其他情况下则不会发生
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
IOUtils.cleanup(null, peer);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) {
IOUtils.cleanup(null, peer);
// 数据节点可能由于存在太多的数据传输导致内存溢出,记录该事件,并等待30秒,其他的数据传输可能到时就完成了
LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
// ignore
}
} catch (Throwable te) {
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
// Close the server to stop reception of more requests.
// 关闭服务器停止接收更多请求
try {
peerServer.close();
closed = true;
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie);
}
// if in restart prep stage, notify peers before closing them.
// 如果在重新启动前准备阶段,在关闭前通知peers
if (datanode.shutdownForUpgrade) {
restartNotifyPeers();
LOG.info("Shutting down DataXceiverServer before restart");
// Allow roughly up to 2 seconds.
for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// ignore
}
}
}
// Close all peers.
// 关闭所有的peers
closeAllPeers();
}
2、DataXceiver流式请求的读写
DataXceiver是一个线程类,其运行的run方法的主要功能是读取请求的类型,根据类型执行相应的操作,我们主要分析读和写数据块相关的方法:
/**
* Read/write data from/to the DataXceiverServer.
*/
@Override
public void run() {
int opsProcessed = 0;
Op op = null;
try {
// 先根据传入的peer对象获取对应的输入/输出流,并对流进行装饰
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
// ......
return;
}
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
// DataXceiver主体循环
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
// 调用Receiver.readOp()从输入流中解析操作符
op = readOp();
} catch (InterruptedIOException ignored) {
// ......
}
// ......
opStartTime = now();
processOp(op); // 调用processOp()处理该流式请求操作
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
// ......
}
}
其主要的工作流程是:从IO中读取流式接口请求并解析处对应的操作符,并调用processOp()处理该流式请求操作,该方法会根据Op调用DataXceiver对应的处理方法:
/** Read an Op. It also checks protocol version. */
protected final Op readOp() throws IOException {
final short version = in.readShort();
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
DataTransferProtocol.DATA_TRANSFER_VERSION +
", Received: " + version + " )");
}
return Op.read(in);
}
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
3、数据块读取readBlock():
我们知道在client读取hdfs文件数据块的时候,会在构造blockReader的时候调用
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
向目标DataNode发送对应数据块的Op.READ_BLOCK操作码,DataReceiver在接收到来自客户端的Op.READ_BLOCK读数据块操作码后,会调用DataXceiver.readBlock()响应这个读请求。其调用流程如下:
DataXceiver.readBlock()首先会向客户端回复一个BlockOpResponseProto响应,表示当前请求DataXceiver已经成功接收,并通过BlockOpResponseProto告知Client客户端当前DataNode所使用的校验方式。接下来便会将数据块block切分成若干个数据包packet,然后依次将数据包发送至客户端。客户端在接收到每个数据包packet时会进行校验,并将校验结果发送给DataNode;其基本的读取流程如下:
DataXceiver.readBlock()方法的基本流程如下:
- 创建BlockSender对象,首先调用getOutputStream()获取DataNode连接到客户端的IO流,并创建构造BlockSender对象;
- 之后便调用writeSuccessWithChecksumInfo()向客户端发送BlockOpResponseProto响应,告知客户端读请求已经接收,并告知客户端当前节点的校验信息;
- 之后便调用blockSender.sendBlock()方法将数据块按照数据包packet的形式发送给客户端;
- 当blockSender完成发送数据块数据包后,客户端会响应一个ReadStatus状态码告知DataNode;
blockSender.sendBlock()方法会将数据块按照一定的组织格式发送到接收方,其发送数据的格式如下:
BlockSender发送的数据格式包括两部分:校验信息头(ChecksumHeader)和数据包序列(packets)
- ChecksumHeader:用于描述当前DataNode使用的校验方式等信息。一个校验头信息也包括2个部分:
- CHECKSUM_TYPE:数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2
- BYTES_PER_CHECKSUM:校验块大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元
- 数据包序列(packets):BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据
- 数据包头:用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度;其数据包信息如下:
- 当前数据包在整个数据块中的位置
- 数据包在管道中的***
- 当前数据包是不是数据块中的最后一个数据包
- 当前数据包数据部分的长度
- 是否需要DN同步
- 校验数据:校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度
- 实际数据:数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块
- 数据包头:用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度;其数据包信息如下:
BlockSender中的数据块发送过程主要包括:1、发送准备;2、发送数据块;3、清理工作
1、发送准备:主要是根据参数进行 是否需要验证校验数据、是否开启transferTo模式、从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值、寻找正确的offset等判断;其源码主要在BlockSender的构造函数当中;
2、发送数据块:BlockSender.sendBlock()用读取数据以及校验和,并将它们发送到接收方。整体流程步骤如下:
- 在开始读文件时,会触发一次预读取,也就是将数据缓存到操作系统缓冲区中;
- 构造pktBuf缓冲区,也即是能容纳一个数据包的缓冲区;
- 循环调用sendPacket()发送数据包序列,直到整个数据块发送完毕;
- 发送一个空的数据包来标识数据块的结束;
- 完成数据包发送过程之后,调用close()方法关闭数据块以及校验文件;
sendBlock()以及sendPacket()源码如下:
long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
// Trigger readahead of beginning of file if configured.
// 数据预读取至缓存
manageOsCache();
final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
// 构造存放数据包的缓冲区
try {
int maxChunksPerPacket;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only hold checksum when doing transferTo
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
// 缓冲区存放checksum 和 data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
// 构造缓冲区pktBuf
ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
// 循环调用sendPacket()发送数据包packet
while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
// If this thread was interrupted, then it did not send the full block.
if (!Thread.currentThread().isInterrupted()) {
try {
// send an empty packet to mark the end of the block
// 发送一个空的数据包来标识数据块的结束
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
throttler);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
sentEntireByteRange = true;
}
} finally {
if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
final long endTime = System.nanoTime();
ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();
}
return totalRead;
}
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset, (chunkSize * (long) maxChunks));
// 数据包中包含多少校验块
int numChunks = numberOfChunks(dataLen);
// 校验数据长度
int checksumDataLen = numChunks * checksumSize;
// 数据包长度
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
// 将数据包头写入缓存
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
// 数据包头在缓存中的位置
int headerOff = pkt.position() - headerLen;
// 校验数据在缓存中的位置
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
// 校验数据写入缓存
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get lastchecksu
if (lastDataPacket && lastChunkChecksum != null)
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
}
}
int dataOff = checksumOff + checksumDataLen;
// 在普通模式下下将数据写入缓存
if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
// 确认校验和数据
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream) out;
// 首先将头和校验和数据写入缓存
sockOut.write(buf, headerOff, dataOff - headerOff);
// 使用transfer方式,将数据通过0拷贝的方式写入IO流
FileChannel fileCh = ((FileInputStream) blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
// 普通模式下数据写入IO流
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException)
} else {
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {
LOG.error("BlockSender.sendChunks()exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
}
if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}
return dataLen;
}
3、清理工作:主要是关闭对应的数据流:
checksumIn.close(); // close checksum file
blockIn.close(); // close data file