欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Write and Read Parquet File in Java, Query with Filter and Specify Projections

程序员文章站 2022-07-14 12:22:43
...

Links for more information

what is parquet: http://parquet.apache.org

API to use: https://www.javadoc.io/doc/org.apache.parquet/parquet-column/1.10.0

maven dependencies

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.9.0</version> <!-- or latest version -->
</dependency>

ParquetWrite

In this example, I create a parquet file, 11 columns (including "id") and 10 rows, named "file.parquet".

id c0 c1 c2 c3 c4 c5 c6 c7 c8 c9
                     
                     
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.io.File;
import java.io.IOException;

public class ParquetWrite {

    public static void main(String args[]) throws IOException {
        ParquetWriter writer;
        MessageType schema;

        int lineNumber = 10, columnNumber = 10;
        String COLUMN_PRIFIX = "c";
        PrimitiveType.PrimitiveTypeName typeName = PrimitiveType.PrimitiveTypeName.FLOAT;
        String schemaName = "Record";
        Configuration configuration = new Configuration();
        String filePath = "file.parquet";
        boolean usingEncoing = true;

        // initialize writer
        Types.MessageTypeBuilder builder = Types.buildMessage();
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "id"));
        for (int i = 0; i < columnNumber; i++)
            builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, typeName,COLUMN_PRIFIX + i));
        schema = builder.named(schemaName);
        GroupWriteSupport.setSchema(schema, configuration);
        GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
        groupWriteSupport.init(configuration);
        new File(filePath).delete();
        writer = new ParquetWriter(new Path(filePath), groupWriteSupport, CompressionCodecName.SNAPPY,
                ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                usingEncoing, true, ParquetProperties.WriterVersion.PARQUET_2_0);

        SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);

        // write data
        for(int i = 0; i < lineNumber; i++){
            Group group = simpleGroupFactory.newGroup();
            group.add("id", (long)i);
            for(int j = 0; j < columnNumber; j++)
                group.add(COLUMN_PRIFIX + j, (float)Math.random());
            writer.write(group);
        }
        writer.close();

    }
}

ParquetRead

I've seached a lot online for querying parquet file in java. But seldom of them provide examples on querying with filters and specifying fields.

Here is an example.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.io.IOException;

import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.lt;

public class ParquetRead {

    public static void main(String args[]) throws IOException {
        Configuration configuration = new Configuration();
        String COLUMN_PRIFIX = "c";
        int selectNum = 8;
        String schemaName = "Record";
        String filePath = "file.parquet";

        // set filter
        ParquetInputFormat.setFilterPredicate(configuration, lt(longColumn("id"), (long)(5)));
        FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration);

        // set schema
        Types.MessageTypeBuilder builder = Types.buildMessage();
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "id"));
        for(int i = 0; i < selectNum; i++)
            builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.FLOAT, COLUMN_PRIFIX+ i));
        MessageType querySchema = builder.named(schemaName);
        configuration.set(ReadSupport.PARQUET_READ_SCHEMA, querySchema.toString());

        // set reader, withConf set specific fields (requested projection), withFilter set the filter.
        // if omit withConf, it queries all fields
        ParquetReader.Builder<Group> reader= ParquetReader
                .builder(new GroupReadSupport(), new Path(filePath))
                .withConf(configuration)
                .withFilter(filter);

        // read
        ParquetReader<Group> build=reader.build();
        Group line;
        while((line=build.read())!=null)
            System.out.println(line.toString());

    }
}

Output

id: 0
c0: 0.8837082
c1: 0.78388804
c2: 0.8812319
c3: 0.9382603
c4: 0.75977546
c5: 0.20293085
c6: 0.48019806
c7: 0.01939616

id: 1
c0: 0.82362974
c1: 0.31797537
c2: 0.45773265
c3: 0.9592361
c4: 0.9708959
c5: 0.0033301
c6: 0.28720865
c7: 0.7131074

id: 2
c0: 0.22446668
c1: 0.68124664
c2: 0.945796
c3: 0.033765428
c4: 0.72341245
c5: 0.3164981
c6: 0.932399
c7: 0.6662265

id: 3
c0: 0.9734468
c1: 0.20759688
c2: 0.5723708
c3: 0.428353
c4: 0.8211105
c5: 0.002382244
c6: 0.46220633
c7: 0.9473461

id: 4
c0: 0.7288939
c1: 0.7500735
c2: 0.10537499
c3: 0.06793926
c4: 0.7118421
c5: 0.10614249
c6: 0.43322328
c7: 0.55077094

 

相关标签: parquet