iceberg flink 写操作
程序员文章站
2022-03-08 08:05:43
...
org.apache.iceberg.io.PartitionedFanoutWriter#write
public void write(T row) throws IOException {
// org.apache.flink.table.data.RowData -> org.apache.iceberg.PartitionKey
PartitionKey partitionKey = partition(row);
// org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter
RollingFileWriter writer = writers.get(partitionKey);
if (writer == null) {
// NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
PartitionKey copiedKey = partitionKey.copy();
writer = new RollingFileWriter(copiedKey);
writers.put(copiedKey, writer);
}
writer.write(row);
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#write(T)
public void write(T record) throws IOException {
// org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter
write(currentWriter, record);
this.currentRows++;
// currentRows % ROWS_DIVISOR == 0
if (shouldRollToNewFile()) {
closeCurrent();
openCurrent();
}
}
org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter#write
@Override
void write(DataWriter<T> writer, T record) {
// org.apache.iceberg.io.DataWriter
writer.add(record);
}
org.apache.iceberg.io.DataWriter#add
public void add(T row) {
// org.apache.iceberg.parquet.ParquetWriter
appender.add(row);
}
org.apache.iceberg.parquet.ParquetWriter#add
@Override
public void add(T value) {
recordCount += 1;
// org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter
model.write(0, value);
// org.apache.parquet.column.impl.ColumnWriteStoreV1
writeStore.endRecord();
checkSize();
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#close
@Override
public void close() throws IOException {
closeCurrent();
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#closeCurrent
private void closeCurrent() throws IOException {
if (currentWriter != null) {
// org.apache.iceberg.io.DataWriter
currentWriter.close();
if (currentRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
complete(currentWriter);
}
this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}
下一篇: Java8 特性-stream流使用