iceberg flink 读操作
程序员文章站
2022-03-08 08:05:49
...
org.apache.iceberg.flink.source.FlinkInputFormat.open
@Override
public void open(FlinkInputSplit split) {
// split.getTask().files(): FlinkInputSplit.CombinedScanTask.Collection<FileScanTask>: SplitScanTask
// encryptionClass: org.apache.iceberg.encryption.PlaintextEncryptionManager
// tableSchema & context.project(): org.apache.iceberg.Schema
// caseSensitive: false
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
}
org.apache.iceberg.flink.source.DataIterator.DataIterator
DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
this.tasks = task.files().iterator();
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
// 该 task 进程的所有需要 scan 的 parq 文件路径
this.inputFiles = Collections.unmodifiableMap(files);
this.currentIterator = CloseableIterator.empty();
}
org.apache.iceberg.hadoop.HadoopFileIO#newInputFile {...}
org.apache.iceberg.flink.source.RowDataIterator.RowDataIterator
RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
Schema projectedSchema, String nameMapping, boolean caseSensitive) {
super(task, io, encryption);
// tableSchema & projectedSchema: org.apache.iceberg.Schema
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
// false
this.caseSensitive = caseSensitive;
}
org.apache.iceberg.flink.source.DataIterator.hasNext
@Override
public boolean hasNext() {
updateCurrentIterator();
return currentIterator.hasNext();
}
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator
private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
org.apache.iceberg.parquet.ParquetReader.iterator
@Override
public CloseableIterator<T> iterator() {
FileIterator<T> iter = new FileIterator<>(init());
addCloseable(iter);
return iter;
}
org.apache.iceberg.flink.source.DataIterator.next
@Override
public T next() {
updateCurrentIterator();
return currentIterator.next();
}
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator
private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
org.apache.iceberg.parquet.ParquetReader.FileIterator.next
@Override
public T next() {
// valuesRead: 0
// nextRowGroupStart: 0
if (valuesRead >= nextRowGroupStart) {
advance();
}
if (reuseContainers) {
this.last = model.read(last);
} else {
this.last = model.read(null);
}
valuesRead += 1;
return last;
}
org.apache.iceberg.parquet.ParquetValueReaders.StructReader.read
@Override
public final T read(T reuse) {
I intermediate = newStructData(reuse);
for (int i = 0; i < readers.length; i += 1) {
set(intermediate, i, readers[i].read(get(intermediate, i)));
}
return buildStruct(intermediate);
}
org.apache.iceberg.flink.data.FlinkParquetReaders.StringReader.read
@Override
public StringData read(StringData ignored) {
Binary binary = column.nextBinary();
ByteBuffer buffer = binary.toByteBuffer();
if (buffer.hasArray()) {
return StringData.fromBytes(
buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
return StringData.fromBytes(binary.getBytes());
}
}
上一篇: 什么是Iceberg分区
下一篇: 队列的基本用法