欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

iceberg flink 读操作

程序员文章站 2022-03-08 08:05:49
...

org.apache.iceberg.flink.source.FlinkInputFormat.open

@Override
public void open(FlinkInputSplit split) {
    // split.getTask().files(): FlinkInputSplit.CombinedScanTask.Collection<FileScanTask>: SplitScanTask
    // encryptionClass: org.apache.iceberg.encryption.PlaintextEncryptionManager
    // tableSchema & context.project(): org.apache.iceberg.Schema
    // caseSensitive: false
    this.iterator = new RowDataIterator(
            split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
            context.caseSensitive());
}

org.apache.iceberg.flink.source.DataIterator.DataIterator

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
    this.tasks = task.files().iterator();

    Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
    Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
        .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));

    // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
    Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);

    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
    decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));

    // 该 task 进程的所有需要 scan 的 parq 文件路径
    this.inputFiles = Collections.unmodifiableMap(files);

    this.currentIterator = CloseableIterator.empty();
}

org.apache.iceberg.hadoop.HadoopFileIO#newInputFile {...}

org.apache.iceberg.flink.source.RowDataIterator.RowDataIterator

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
        Schema projectedSchema, String nameMapping, boolean caseSensitive) {
    super(task, io, encryption);
    // tableSchema & projectedSchema: org.apache.iceberg.Schema
    this.tableSchema = tableSchema;
    this.projectedSchema = projectedSchema;
    this.nameMapping = nameMapping;
    // false
    this.caseSensitive = caseSensitive;
}

org.apache.iceberg.flink.source.DataIterator.hasNext

@Override
public boolean hasNext() {
    updateCurrentIterator();
    return currentIterator.hasNext();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
    try {
        while (!currentIterator.hasNext() && tasks.hasNext()) {
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

org.apache.iceberg.parquet.ParquetReader.iterator

@Override
public CloseableIterator<T> iterator() {
    FileIterator<T> iter = new FileIterator<>(init());
    addCloseable(iter);
    return iter;
}

org.apache.iceberg.flink.source.DataIterator.next

@Override
public T next() {
    updateCurrentIterator();
    return currentIterator.next();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
    try {
        while (!currentIterator.hasNext() && tasks.hasNext()) {
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

org.apache.iceberg.parquet.ParquetReader.FileIterator.next

@Override
public T next() {
    // valuesRead: 0
    // nextRowGroupStart: 0
    if (valuesRead >= nextRowGroupStart) {
        advance();
    }

    if (reuseContainers) {
        this.last = model.read(last);
    } else {
        this.last = model.read(null);
    }
    valuesRead += 1;

    return last;
}

org.apache.iceberg.parquet.ParquetValueReaders.StructReader.read

@Override
public final T read(T reuse) {
    I intermediate = newStructData(reuse);

    for (int i = 0; i < readers.length; i += 1) {
        set(intermediate, i, readers[i].read(get(intermediate, i)));
    }

    return buildStruct(intermediate);
}

org.apache.iceberg.flink.data.FlinkParquetReaders.StringReader.read

@Override
public StringData read(StringData ignored) {
    Binary binary = column.nextBinary();
    ByteBuffer buffer = binary.toByteBuffer();
    if (buffer.hasArray()) {
        return StringData.fromBytes(
                buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
    } else {
        return StringData.fromBytes(binary.getBytes());
    }
}

 

相关标签: Iceberg iceberg