欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

iceberg flink 写操作

程序员文章站 2022-03-08 08:05:43
...

org.apache.iceberg.io.PartitionedFanoutWriter#write

public void write(T row) throws IOException {
    // org.apache.flink.table.data.RowData -> org.apache.iceberg.PartitionKey
    PartitionKey partitionKey = partition(row);
    // org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter
    RollingFileWriter writer = writers.get(partitionKey);
    if (writer == null) {
        // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
        PartitionKey copiedKey = partitionKey.copy();
        writer = new RollingFileWriter(copiedKey);
        writers.put(copiedKey, writer);
    }
    writer.write(row);
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#write(T)

public void write(T record) throws IOException {
    // org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter
    write(currentWriter, record);
    this.currentRows++;
    // currentRows % ROWS_DIVISOR == 0
    if (shouldRollToNewFile()) {
        closeCurrent();
        openCurrent();
    }
}

org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter#write

@Override
void write(DataWriter<T> writer, T record) {
    // org.apache.iceberg.io.DataWriter
    writer.add(record);
}

org.apache.iceberg.io.DataWriter#add

public void add(T row) {
    // org.apache.iceberg.parquet.ParquetWriter
    appender.add(row);
}

org.apache.iceberg.parquet.ParquetWriter#add

@Override
public void add(T value) {
    recordCount += 1;
    // org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter
    model.write(0, value);
    // org.apache.parquet.column.impl.ColumnWriteStoreV1
    writeStore.endRecord();
    checkSize();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#close

@Override
public void close() throws IOException {
    closeCurrent();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#closeCurrent

private void closeCurrent() throws IOException {
    if (currentWriter != null) {
        // org.apache.iceberg.io.DataWriter
        currentWriter.close();

        if (currentRows == 0L) {
            io.deleteFile(currentFile.encryptingOutputFile());
        } else {
            complete(currentWriter);
        }

        this.currentFile = null;
        this.currentWriter = null;
        this.currentRows = 0;
    }
}

 

相关标签: Iceberg