欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

BlockSender发送数据的格式及BlockSender的实现讲解

程序员文章站 2022-03-15 11:19:11
一 BlockSender发送数据的格式详解 BlockSender主要负责从DataNode的磁盘读取数据块,然后发送数据块到接收方。需要注意的是,BlockSender发送的数据是以一定...

一 BlockSender发送数据的格式详解

BlockSender主要负责从DataNode的磁盘读取数据块,然后发送数据块到接收方。需要注意的是,BlockSender发送的数据是以一定的结构组织的。

BlockSender发送数据的格式及BlockSender的实现讲解

BlockSender发送的数据格式包括两部分:

校验信息头(ChecksumHeader)和数据包序列(packets)

1.1 ChecksumHeader

用于描述当前DataNode使用的校验方式等信息。如下所示,一个校验头信息也包括2个部分:

CHECKSUM_TYPE:校验类型

数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2

BYTES_PER_CHECKSUM:校验块大小

校验快大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元

1.2数据包序列(packets)

BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据

1.2.1数据包头

用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度

=>当前数据包在整个数据块中的位置

=>数据包在管道中的序列号

=>当前数据包是不是数据块中的最后一个数据包

=>当前数据包数据部分的长度

=>是否需要DN同步

1.2.2校验数据

校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度

1.2.3实际数据

数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块

二 BlockSender的实现

数据块的发送主要是由BlockSender来实现的,其发送过程包括:

发送准备,发送数据,清理工作

2.1发送准备

BlockSender数据块发送准备工作主要是在构造过程中执行的,

BlockSender(ExtendedBlock block, long startOffset, long length,

boolean corruptChecksumOk, boolean verifyChecksum,

boolean sendChecksum, DataNode datanode, String clientTraceFmt,

CachingStrategy cachingStrategy)

throws IOException {

try {

this.block = block;

this.corruptChecksumOk = corruptChecksumOk;

this.verifyChecksum = verifyChecksum;

this.clientTraceFmt = clientTraceFmt;

/*

* 如果缓存策略readDropBehind为空,我们则按照配置文件

* dfs.datanode.drop.cache.behind.reads来初始化

* dropCacheBehindLargeReads

*/

if (cachingStrategy.getDropBehind() ==null) {

this.dropCacheBehindAllReads = false;

this.dropCacheBehindLargeReads =

datanode.getDnConf().dropCacheBehindReads;

} else {

this.dropCacheBehindAllReads =

this.dropCacheBehindLargeReads =

cachingStrategy.getDropBehind().booleanValue();

}

/*

* 如果缓存策略readahead为空,那么我们则按照配置文件

* dfs.datanode.readahead.bytes的处理,默认是

* 4Mb

*/

if (cachingStrategy.getReadahead() ==null) {

this.alwaysReadahead = false;

this.readaheadLength = datanode.getDnConf().readaheadLength;

} else {

this.alwaysReadahead = true;

this.readaheadLength = cachingStrategy.getReadahead().longValue();

}

this.datanode = datanode;

//如果需要验证校验数据

if (verifyChecksum) {

// To simplify implementation, callers maynot specify verification

// without sending.

Preconditions.checkArgument(sendChecksum,

"If verifying checksum, currently mustalso send it.");

}

final Replica replica;

final long replicaVisibleLength;

synchronized(datanode.data) {

replica = getReplica(block, datanode);

replicaVisibleLength = replica.getVisibleLength();

}

// if there is a write in progress

ChunkChecksum chunkChecksum = null;

if (replica instanceof ReplicaBeingWritten) {

final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;

waitForMinLength(rbw, startOffset + length);

chunkChecksum = rbw.getLastChecksumAndDataLen();

}

if (replica.getGenerationStamp() < block.getGenerationStamp()) {

throw new IOException("Replica gen stamp < block genstamp, block="

+ block + ", replica=" + replica);

} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("Bumpingup the client provided"

+ " block's genstamp to latest " + replica.getGenerationStamp()

+ " for block " + block);

}

block.setGenerationStamp(replica.getGenerationStamp());

}

if (replicaVisibleLength < 0) {

throw new IOException("Replica is not readable, block="

+ block + ", replica=" + replica);

}

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("block=" + block + ",replica=" + replica);

}

//是否开启transferTo模式

this.transferToAllowed = datanode.getDnConf().transferToAllowed &&

(!is32Bit || length <= Integer.MAX_VALUE);

// Obtain a reference before reading data

this.volumeRef = datanode.data.getVolume(block).obtainReference();

/*

* (corruptChecksumOK, meta_file_exist):operation

* True,True: will verify checksum

* True,False: No verify, e.g., need to read data from a corrupted file

* False,True: will verify checksum

* False, False: throws IOException filenot found

*/

//获取checksum信息:从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值

//也就是校验块的大小

DataChecksum csum = null;

if (verifyChecksum || sendChecksum) {

LengthInputStream metaIn = null;

boolean keepMetaInOpen = false;

try {

metaIn = datanode.data.getMetaDataInputStream(block);

if (!corruptChecksumOk || metaIn != null) {

if (metaIn == null) {

//need checksum but meta-data not found

throw new FileNotFoundException("Meta-data not found for " +

block);

}

if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()){

checksumIn = new DataInputStream(new BufferedInputStream(

metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));

csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);

keepMetaInOpen = true;

}

} else {

LOG.warn("Couldnot find metadata file for " + block);

}

} finally {

if (!keepMetaInOpen) {

IOUtils.closeStream(metaIn);

}

}

}

if (csum == null) {

csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);

}

/*

* If chunkSize is very large, then the metadatafile is mostly

* corrupted. For now just truncatebytesPerchecksum to blockLength.

*/

int size = csum.getBytesPerChecksum();

if (size > 10*1024*1024 && size > replicaVisibleLength) {

csum = DataChecksum.newDataChecksum(csum.getChecksumType(),

Math.max((int)replicaVisibleLength, 10*1024*1024));

size = csum.getBytesPerChecksum();

}

chunkSize = size;//校验块大小

checksum = csum;//校验算法

checksumSize = checksum.getChecksumSize();//校验和长度

length = length < 0 ? replicaVisibleLength : length;

// end is either last byte on disk or the length forwhich we have a

// checksum

long end = chunkChecksum != null ? chunkChecksum.getDataLength()

: replica.getBytesOnDisk();

if (startOffset < 0 || startOffset > end

|| (length + startOffset) > end) {

String msg = " Offset " + startOffset + "and length " + length

+ " don't match block " + block + " (blockLen " + end + " )";

LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +

":sendBlock() : " + msg);

throw new IOException(msg);

}

//计算offset和endOffset,offset用于标识要去读取的数据在数据块的起始位置

//endOffset:用于标识结束位置。由于读取位置往往不会落在某个校验块的起始位置,

//所以在准备工作中,需要确保offset的校验块的起始位置,endOffset在校验块的

//的结束位置。这样读取时就可以校验块为单位读取,方便校验和的操作

offset = startOffset - (startOffset % chunkSize);

if (length >= 0) {

// Ensure endOffset points to end of chunk.

long tmpLen = startOffset + length;

if (tmpLen % chunkSize != 0) {

tmpLen += (chunkSize - tmpLen % chunkSize);

}

if (tmpLen < end) {

// will use on-disk checksum here since theend is a stable chunk

end = tmpLen;

} else if (chunkChecksum != null) {

// last chunk is changing. flag that weneed to use in-memory checksum

this.lastChunkChecksum = chunkChecksum;

}

}

endOffset = end;

//寻找正确的offSET

if (offset > 0 && checksumIn != null) {

long checksumSkip = (offset / chunkSize) * checksumSize;

// note blockInStream is seeked whencreated below

if (checksumSkip > 0) {

// Should we use seek() for checksum fileas well?

IOUtils.skipFully(checksumIn, checksumSkip);

}

}

seqno = 0;

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("replica=" + replica);

}

blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset

if (blockIn instanceof FileInputStream) {

blockInFd = ((FileInputStream)blockIn).getFD();

} else {

blockInFd = null;

}

} catch (IOException ioe) {

IOUtils.closeStream(this);

IOUtils.closeStream(blockIn);

throw ioe;

}

}

2.2发送数据块

首先会进行预读取和丢弃,调用manageOsCache操作

/*

* 预读取或者丢弃

*

*/

private void manageOsCache() throws IOException {

if (blockInFd == null) return;

//按照条件触发预读取操作

if ((readaheadLength > 0)&& (datanode.readaheadPool != null)&&

(alwaysReadahead || isLongRead())) {

//满足预读取条件,则调用ReadaheadPool.readaheadStream方法触发预读取

curReadahead = datanode.readaheadPool.readaheadStream(

clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,

curReadahead);

}

//丢弃刚才从缓存中读取的数据,因为不再需要使用这些数据了

if (dropCacheBehindAllReads ||

(dropCacheBehindLargeReads && isLongRead())) {

//丢弃数据的位置

long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;

if (offset >= nextCacheDropOffset) {

//如果下一次读取数据的位置大于丢弃的数据的位置,则将读取数据位置前的数据全部丢弃

long dropLength = offset - lastCacheDropOffset;

NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(

block.getBlockName(), blockInFd, lastCacheDropOffset,

dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);

lastCacheDropOffset = offset;

}

}

}

/**

* 读取数据块和他的元数据,然后发送数据到啊客户端或者其他的datanode

* out:将数据写到那儿

* throttler: 用于发送数据

*/

long sendBlock(DataOutputStream out, OutputStream baseStream,

DataTransferThrottler throttler) throws IOException {

TraceScope scope = datanode.tracer.

newScope("sendBlock_" + block.getBlockId());

try {

return doSendBlock(out, baseStream, throttler);

} finally {

scope.close();

}

}

private long doSendBlock(DataOutputStream out, OutputStream baseStream,

DataTransferThrottler throttler) throws IOException {

if (out == null) {

throw new IOException( "out stream is null" );

}

initialOffset = offset;

long totalRead = 0;

OutputStream streamForSendChunks = out;

lastCacheDropOffset = initialOffset;

if (isLongRead() && blockInFd != null) {

// Advise that this file descriptor will be accessedsequentially.

NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(

block.getBlockName(), blockInFd, 0, 0,

NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);

}

//预读取&丢弃

manageOsCache();

final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;

try {

int maxChunksPerPacket;

//构造一个Packet Buffer,也就是能容纳一个数据包的大小,对于2中不同发送数据包模式:

//transferTo和ioStream,缓冲区大小是不一样的。在transferTo模式中,数据块文件

//是通过零拷贝的方式直接传输给客户端,不需要将数据块文件写入缓冲区,所以Packet Buffer

//只需要缓冲校验数据即可;而ioStream模式则需要将实际数据以及校验数据都缓存下来

int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;

boolean transferTo = transferToAllowed && !verifyChecksum

&& baseStream instanceof SocketOutputStream

&& blockIn instanceof FileInputStream;

if (transferTo) {

FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();

blockInPosition = fileChannel.position();

streamForSendChunks = baseStream;

maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

// Smaller packet size to only holdchecksum when doing transferTo

pktBufSize += checksumSize * maxChunksPerPacket;

} else {

maxChunksPerPacket = Math.max(1,

numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));

// Packet size includes both checksum anddata

pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;

}

ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

//循环调用sendPacket发送数据包序列

while (endOffset > offset && !Thread.currentThread().isInterrupted()) {

//预读取

manageOsCache();

long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,

transferTo, throttler);

offset += len;//更新offset

totalRead += len + (numberOfChunks(len) * checksumSize);

seqno++;

}

// If this thread was interrupted, then it did not sendthe full block.

if (!Thread.currentThread().isInterrupted()) {

try {

//发送一个空的数据包泳衣标志数据块的结束

sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,

throttler);

out.flush();

} catch (IOException e) { //socket error

throw ioeToSocketException(e);

}

sentEntireByteRange = true;

}

} finally {

if ((clientTraceFmt != null)&& ClientTraceLog.isDebugEnabled()) {

final long endTime = System.nanoTime();

ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,

initialOffset, endTime - startTime));

}

close();

}

return totalRead;

}

private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,

boolean transferTo, DataTransferThrottler throttler) throws IOException {

int dataLen = (int) Math.min(endOffset - offset,

(chunkSize * (long) maxChunks));

//数据包中包含多少校验块

int numChunks = numberOfChunks(dataLen);

//校验数据长度

int checksumDataLen = numChunks * checksumSize;

//数据包长度

int packetLen = dataLen + checksumDataLen + 4;

boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;

//将数据包头写入缓存

int headerLen = writePacketHeader(pkt, dataLen, packetLen);

//数据包头在缓存中的位置

int headerOff = pkt.position() - headerLen;

//校验数据在缓存中的位子

int checksumOff = pkt.position();

byte[] buf = pkt.array();

if (checksumSize > 0&& checksumIn != null) {

//校验数据写入缓存

readChecksum(buf, checksumOff, checksumDataLen);

// write in progress that we need to use to get lastchecksum

if (lastDataPacket && lastChunkChecksum != null) {

int start = checksumOff + checksumDataLen - checksumSize;

byte[] updatedChecksum = lastChunkChecksum.getChecksum();

if (updatedChecksum != null) {

System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);

}

}

}

int dataOff = checksumOff + checksumDataLen;

//在普通模式下下将数据写入缓存

if (!transferTo) { //normal transfer

IOUtils.readFully(blockIn, buf, dataOff, dataLen);

//确认校验和数据

if (verifyChecksum) {

verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);

}

}

try {

if (transferTo) {

SocketOutputStream sockOut = (SocketOutputStream)out;

//首先将头和校验和数据写入缓存

sockOut.write(buf, headerOff, dataOff - headerOff);

//使用transfer方式,将数据通过0拷贝的方式写入IO流

FileChannel fileCh = ((FileInputStream)blockIn).getChannel();

LongWritable waitTime = new LongWritable();

LongWritable transferTime = new LongWritable();

sockOut.transferToFully(fileCh, blockInPosition, dataLen,

waitTime, transferTime);

datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());

datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());

blockInPosition += dataLen;

} else {

//普通模式下数据写入IO流

out.write(buf, headerOff, dataOff + dataLen - headerOff);

}

} catch (IOException e) {

if (e instanceof SocketTimeoutException) {

} else {

String ioem = e.getMessage();

if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {

LOG.error("BlockSender.sendChunks()exception: ", e);

}

datanode.getBlockScanner().markSuspectBlock(

volumeRef.getVolume().getStorageID(),

block);

}

throw ioeToSocketException(e);

}

if (throttler != null) { // rebalancing so throttle

throttler.throttle(packetLen);

}

return dataLen;

}