Spark开发详细流程之二:如何写Parquet文件?
程序员文章站
2022-05-18 19:53:34
...
package App;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class TestParquetWriter {
private static MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
private static String filePath = "E:/test/";
private static final int ROW_GROUP_SIZE = 1024 * 1024 * 8; // 文件大小大于阈值时,写入磁盘
public static void main(String[] args) throws IOException {
Path path = new Path(filePath);
ExampleParquetWriter.Builder builder = ExampleParquetWriter
.builder(path).withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.GZIP)
.withRowGroupSize(ROW_GROUP_SIZE)
.withType(schema);
ParquetWriter<Group> writer = builder.build();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
try {
long val1 = 100;
String val2 = "200";
writer.write(groupFactory.newGroup().append("system_utc_second", val1).append("msisdn", val2).append("imsi", "300").append("ci", "400"));
} catch (ParseException e) {
e.printStackTrace();
}
writer.close();
}
}