Parquet WorkaroundChunk
程序员文章站
2022-07-14 12:22:25
...
Parquet WorkaroundChunk implementation
/**
* deals with a now fixed bug where compressedLength was missing a few bytes.
*/
protected class WorkaroundChunk extends Chunk {
private final SeekableInputStream f;
/**
* @param descriptor the descriptor of the chunk
* @param f the file stream positioned at the end of this chunk
*/
WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
super(descriptor, buffers);
this.f = f;
}
protected PageHeader readPageHeader() throws IOException {
PageHeader pageHeader;
stream.mark(8192); // headers should not be larger than 8k
try {
pageHeader = Util.readPageHeader(stream);
} catch (IOException e) {
// this is to workaround a bug where the compressedLength
// of the chunk is missing the size of the header of the dictionary
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
// if the last page is smaller than this, the page header itself is truncated in the buffer.
stream.reset(); // resetting the buffer to the position before we got the error
LOG.info("completing the column chunk to read the page header");
pageHeader = Util.readPageHeader(new SequenceInputStream(stream, f)); // trying again from the buffer + remainder of the stream.
}
return pageHeader;
}
public BytesInput readAsBytesInput(int size) throws IOException {
int available = stream.available();
if (size > available) {
// this is to workaround a bug where the compressedLength
// of the chunk is missing the size of the header of the dictionary
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
int missingBytes = size - available;
LOG.info("completed the column chunk with {} bytes", missingBytes);
List<ByteBuffer> buffers = new ArrayList<>();
buffers.addAll(stream.sliceBuffers(available));
ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes);
f.readFully(lastBuffer);
buffers.add(lastBuffer);
return BytesInput.from(buffers);
}
return super.readAsBytesInput(size);
}
}
analyze
从文件流补齐header
readPageHeader()
pageHeader = Util.readPageHeader(new SequenceInputStream(stream, f));
从文件流不齐 page数据
readAsBytesInput
f.readFully(lastBuffer);
catch handle
因为读 pageHeader 的时候,没有size,只能通过 捕获异常来解决
推荐阅读
-
(译)优化ORC和Parquet文件,提升大SQL读取性能
-
0496-使用Parquet矢量化为Hive加速
-
Write and Read Parquet File in Java, Query with Filter and Specify Projections
-
Hadoop Parquet File 文件的读取
-
Apache parquet格式解析
-
Parquet WorkaroundChunk
-
Spark Sql Read Parquet Files; Number of Partitions.
-
Spark Parquet file split
-
理解pd.read_parquet
-
pd.read_parquet()报错