Write and Read Parquet File in Java, Query with Filter and Specify Projections
程序员文章站
2022-07-14 12:22:43
...
Links for more information
what is parquet: http://parquet.apache.org
API to use: https://www.javadoc.io/doc/org.apache.parquet/parquet-column/1.10.0
maven dependencies
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.9.0</version> <!-- or latest version -->
</dependency>
ParquetWrite
In this example, I create a parquet file, 11 columns (including "id") and 10 rows, named "file.parquet".
id | c0 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 |
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import java.io.File;
import java.io.IOException;
public class ParquetWrite {
public static void main(String args[]) throws IOException {
ParquetWriter writer;
MessageType schema;
int lineNumber = 10, columnNumber = 10;
String COLUMN_PRIFIX = "c";
PrimitiveType.PrimitiveTypeName typeName = PrimitiveType.PrimitiveTypeName.FLOAT;
String schemaName = "Record";
Configuration configuration = new Configuration();
String filePath = "file.parquet";
boolean usingEncoing = true;
// initialize writer
Types.MessageTypeBuilder builder = Types.buildMessage();
builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "id"));
for (int i = 0; i < columnNumber; i++)
builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, typeName,COLUMN_PRIFIX + i));
schema = builder.named(schemaName);
GroupWriteSupport.setSchema(schema, configuration);
GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
groupWriteSupport.init(configuration);
new File(filePath).delete();
writer = new ParquetWriter(new Path(filePath), groupWriteSupport, CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
usingEncoing, true, ParquetProperties.WriterVersion.PARQUET_2_0);
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
// write data
for(int i = 0; i < lineNumber; i++){
Group group = simpleGroupFactory.newGroup();
group.add("id", (long)i);
for(int j = 0; j < columnNumber; j++)
group.add(COLUMN_PRIFIX + j, (float)Math.random());
writer.write(group);
}
writer.close();
}
}
ParquetRead
I've seached a lot online for querying parquet file in java. But seldom of them provide examples on querying with filters and specifying fields.
Here is an example.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import java.io.IOException;
import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.lt;
public class ParquetRead {
public static void main(String args[]) throws IOException {
Configuration configuration = new Configuration();
String COLUMN_PRIFIX = "c";
int selectNum = 8;
String schemaName = "Record";
String filePath = "file.parquet";
// set filter
ParquetInputFormat.setFilterPredicate(configuration, lt(longColumn("id"), (long)(5)));
FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration);
// set schema
Types.MessageTypeBuilder builder = Types.buildMessage();
builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "id"));
for(int i = 0; i < selectNum; i++)
builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.FLOAT, COLUMN_PRIFIX+ i));
MessageType querySchema = builder.named(schemaName);
configuration.set(ReadSupport.PARQUET_READ_SCHEMA, querySchema.toString());
// set reader, withConf set specific fields (requested projection), withFilter set the filter.
// if omit withConf, it queries all fields
ParquetReader.Builder<Group> reader= ParquetReader
.builder(new GroupReadSupport(), new Path(filePath))
.withConf(configuration)
.withFilter(filter);
// read
ParquetReader<Group> build=reader.build();
Group line;
while((line=build.read())!=null)
System.out.println(line.toString());
}
}
Output
id: 0
c0: 0.8837082
c1: 0.78388804
c2: 0.8812319
c3: 0.9382603
c4: 0.75977546
c5: 0.20293085
c6: 0.48019806
c7: 0.01939616
id: 1
c0: 0.82362974
c1: 0.31797537
c2: 0.45773265
c3: 0.9592361
c4: 0.9708959
c5: 0.0033301
c6: 0.28720865
c7: 0.7131074
id: 2
c0: 0.22446668
c1: 0.68124664
c2: 0.945796
c3: 0.033765428
c4: 0.72341245
c5: 0.3164981
c6: 0.932399
c7: 0.6662265
id: 3
c0: 0.9734468
c1: 0.20759688
c2: 0.5723708
c3: 0.428353
c4: 0.8211105
c5: 0.002382244
c6: 0.46220633
c7: 0.9473461
id: 4
c0: 0.7288939
c1: 0.7500735
c2: 0.10537499
c3: 0.06793926
c4: 0.7118421
c5: 0.10614249
c6: 0.43322328
c7: 0.55077094