BlockSender发送数据的格式及BlockSender的实现讲解
一 BlockSender发送数据的格式详解
BlockSender主要负责从DataNode的磁盘读取数据块,然后发送数据块到接收方。需要注意的是,BlockSender发送的数据是以一定的结构组织的。
BlockSender发送的数据格式包括两部分:
校验信息头(ChecksumHeader)和数据包序列(packets)
1.1 ChecksumHeader
用于描述当前DataNode使用的校验方式等信息。如下所示,一个校验头信息也包括2个部分:
CHECKSUM_TYPE:校验类型
数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2
BYTES_PER_CHECKSUM:校验块大小
校验快大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元
1.2数据包序列(packets)
BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据
1.2.1数据包头
用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度
=>当前数据包在整个数据块中的位置
=>数据包在管道中的序列号
=>当前数据包是不是数据块中的最后一个数据包
=>当前数据包数据部分的长度
=>是否需要DN同步
1.2.2校验数据
校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度
1.2.3实际数据
数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块
二 BlockSender的实现
数据块的发送主要是由BlockSender来实现的,其发送过程包括:
发送准备,发送数据,清理工作
2.1发送准备
BlockSender数据块发送准备工作主要是在构造过程中执行的,
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum, DataNode datanode, String clientTraceFmt,
CachingStrategy cachingStrategy)
throws IOException {
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
/*
* 如果缓存策略readDropBehind为空,我们则按照配置文件
* dfs.datanode.drop.cache.behind.reads来初始化
* dropCacheBehindLargeReads
*/
if (cachingStrategy.getDropBehind() ==null) {
this.dropCacheBehindAllReads = false;
this.dropCacheBehindLargeReads =
datanode.getDnConf().dropCacheBehindReads;
} else {
this.dropCacheBehindAllReads =
this.dropCacheBehindLargeReads =
cachingStrategy.getDropBehind().booleanValue();
}
/*
* 如果缓存策略readahead为空,那么我们则按照配置文件
* dfs.datanode.readahead.bytes的处理,默认是
* 4Mb
*/
if (cachingStrategy.getReadahead() ==null) {
this.alwaysReadahead = false;
this.readaheadLength = datanode.getDnConf().readaheadLength;
} else {
this.alwaysReadahead = true;
this.readaheadLength = cachingStrategy.getReadahead().longValue();
}
this.datanode = datanode;
//如果需要验证校验数据
if (verifyChecksum) {
// To simplify implementation, callers maynot specify verification
// without sending.
Preconditions.checkArgument(sendChecksum,
"If verifying checksum, currently mustalso send it.");
}
final Replica replica;
final long replicaVisibleLength;
synchronized(datanode.data) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
// if there is a write in progress
ChunkChecksum chunkChecksum = null;
if (replica instanceof ReplicaBeingWritten) {
final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
waitForMinLength(rbw, startOffset + length);
chunkChecksum = rbw.getLastChecksumAndDataLen();
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException("Replica gen stamp < block genstamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("Bumpingup the client provided"
+ " block's genstamp to latest " + replica.getGenerationStamp()
+ " for block " + block);
}
block.setGenerationStamp(replica.getGenerationStamp());
}
if (replicaVisibleLength < 0) {
throw new IOException("Replica is not readable, block="
+ block + ", replica=" + replica);
}
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("block=" + block + ",replica=" + replica);
}
//是否开启transferTo模式
this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
(!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data
this.volumeRef = datanode.data.getVolume(block).obtainReference();
/*
* (corruptChecksumOK, meta_file_exist):operation
* True,True: will verify checksum
* True,False: No verify, e.g., need to read data from a corrupted file
* False,True: will verify checksum
* False, False: throws IOException filenot found
*/
//获取checksum信息:从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值
//也就是校验块的大小
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
LengthInputStream metaIn = null;
boolean keepMetaInOpen = false;
try {
metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " +
block);
}
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()){
checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true;
}
} else {
LOG.warn("Couldnot find metadata file for " + block);
}
} finally {
if (!keepMetaInOpen) {
IOUtils.closeStream(metaIn);
}
}
}
if (csum == null) {
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
}
/*
* If chunkSize is very large, then the metadatafile is mostly
* corrupted. For now just truncatebytesPerchecksum to blockLength.
*/
int size = csum.getBytesPerChecksum();
if (size > 10*1024*1024 && size > replicaVisibleLength) {
csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
Math.max((int)replicaVisibleLength, 10*1024*1024));
size = csum.getBytesPerChecksum();
}
chunkSize = size;//校验块大小
checksum = csum;//校验算法
checksumSize = checksum.getChecksumSize();//校验和长度
length = length < 0 ? replicaVisibleLength : length;
// end is either last byte on disk or the length forwhich we have a
// checksum
long end = chunkChecksum != null ? chunkChecksum.getDataLength()
: replica.getBytesOnDisk();
if (startOffset < 0 || startOffset > end
|| (length + startOffset) > end) {
String msg = " Offset " + startOffset + "and length " + length
+ " don't match block " + block + " (blockLen " + end + " )";
LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
":sendBlock() : " + msg);
throw new IOException(msg);
}
//计算offset和endOffset,offset用于标识要去读取的数据在数据块的起始位置
//endOffset:用于标识结束位置。由于读取位置往往不会落在某个校验块的起始位置,
//所以在准备工作中,需要确保offset的校验块的起始位置,endOffset在校验块的
//的结束位置。这样读取时就可以校验块为单位读取,方便校验和的操作
offset = startOffset - (startOffset % chunkSize);
if (length >= 0) {
// Ensure endOffset points to end of chunk.
long tmpLen = startOffset + length;
if (tmpLen % chunkSize != 0) {
tmpLen += (chunkSize - tmpLen % chunkSize);
}
if (tmpLen < end) {
// will use on-disk checksum here since theend is a stable chunk
end = tmpLen;
} else if (chunkChecksum != null) {
// last chunk is changing. flag that weneed to use in-memory checksum
this.lastChunkChecksum = chunkChecksum;
}
}
endOffset = end;
//寻找正确的offSET
if (offset > 0 && checksumIn != null) {
long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked whencreated below
if (checksumSkip > 0) {
// Should we use seek() for checksum fileas well?
IOUtils.skipFully(checksumIn, checksumSkip);
}
}
seqno = 0;
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream)blockIn).getFD();
} else {
blockInFd = null;
}
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
throw ioe;
}
}
2.2发送数据块
首先会进行预读取和丢弃,调用manageOsCache操作
/*
* 预读取或者丢弃
*
*/
private void manageOsCache() throws IOException {
if (blockInFd == null) return;
//按照条件触发预读取操作
if ((readaheadLength > 0)&& (datanode.readaheadPool != null)&&
(alwaysReadahead || isLongRead())) {
//满足预读取条件,则调用ReadaheadPool.readaheadStream方法触发预读取
curReadahead = datanode.readaheadPool.readaheadStream(
clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
//丢弃刚才从缓存中读取的数据,因为不再需要使用这些数据了
if (dropCacheBehindAllReads ||
(dropCacheBehindLargeReads && isLongRead())) {
//丢弃数据的位置
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) {
//如果下一次读取数据的位置大于丢弃的数据的位置,则将读取数据位置前的数据全部丢弃
long dropLength = offset - lastCacheDropOffset;
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, lastCacheDropOffset,
dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
}
}
}
/**
* 读取数据块和他的元数据,然后发送数据到啊客户端或者其他的datanode
* out:将数据写到那儿
* throttler: 用于发送数据
*/
long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
TraceScope scope = datanode.tracer.
newScope("sendBlock_" + block.getBlockId());
try {
return doSendBlock(out, baseStream, throttler);
} finally {
scope.close();
}
}
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
if (out == null) {
throw new IOException( "out stream is null" );
}
initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
lastCacheDropOffset = initialOffset;
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessedsequentially.
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, 0, 0,
NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}
//预读取&丢弃
manageOsCache();
final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
//构造一个Packet Buffer,也就是能容纳一个数据包的大小,对于2中不同发送数据包模式:
//transferTo和ioStream,缓冲区大小是不一样的。在transferTo模式中,数据块文件
//是通过零拷贝的方式直接传输给客户端,不需要将数据块文件写入缓冲区,所以Packet Buffer
//只需要缓冲校验数据即可;而ioStream模式则需要将实际数据以及校验数据都缓存下来
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only holdchecksum when doing transferTo
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum anddata
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
//循环调用sendPacket发送数据包序列
while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
//预读取
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;//更新offset
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
// If this thread was interrupted, then it did not sendthe full block.
if (!Thread.currentThread().isInterrupted()) {
try {
//发送一个空的数据包泳衣标志数据块的结束
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
throttler);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
sentEntireByteRange = true;
}
} finally {
if ((clientTraceFmt != null)&& ClientTraceLog.isDebugEnabled()) {
final long endTime = System.nanoTime();
ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();
}
return totalRead;
}
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));
//数据包中包含多少校验块
int numChunks = numberOfChunks(dataLen);
//校验数据长度
int checksumDataLen = numChunks * checksumSize;
//数据包长度
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
//将数据包头写入缓存
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
//数据包头在缓存中的位置
int headerOff = pkt.position() - headerLen;
//校验数据在缓存中的位子
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0&& checksumIn != null) {
//校验数据写入缓存
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get lastchecksum
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
}
}
int dataOff = checksumOff + checksumDataLen;
//在普通模式下下将数据写入缓存
if (!transferTo) { //normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
//确认校验和数据
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
//首先将头和校验和数据写入缓存
sockOut.write(buf, headerOff, dataOff - headerOff);
//使用transfer方式,将数据通过0拷贝的方式写入IO流
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
//普通模式下数据写入IO流
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
} else {
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {
LOG.error("BlockSender.sendChunks()exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
}
if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}
return dataLen;
}
推荐阅读
-
JSON的语法规则、数据格式及使用实例讲解
-
TensorFlow:四种类型数据的读取流程及API讲解和代码实现
-
BlockSender发送数据的格式及BlockSender的实现讲解
-
Ajax语法(Ajax基础、运行原理及实现、异步编程、封装、Ajax状态码、onreadystatechange事件、Ajax错误处理、服务器端响应的数据格式、请求参数)
-
用LSTM做时间序列预测的思路,tensorflow代码实现及传入数据格式
-
JSON的语法规则、数据格式及使用实例讲解
-
BlockSender发送数据的格式及BlockSender的实现讲解
-
Ajax语法(Ajax基础、运行原理及实现、异步编程、封装、Ajax状态码、onreadystatechange事件、Ajax错误处理、服务器端响应的数据格式、请求参数)