欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

普通文本压缩成RcFile的通用类

程序员文章站 2022-05-02 08:51:09
...
package com.tool;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.xx.CommonMapper;
import com.xx.hadoop.rcfile.RCFileOutputFormat;

/**
 * CommonRcFileCompression.
 * 將普通的Text文件壓縮成RcFile格式的文件,節約存儲空間.
 * ${srcpath} 要壓縮的文件.
 * ${respath} 存放位置.
 * ${reducenum} reduce數.
 * ${sep} 分隔符.
 * ${colsum} 按sep分隔后一共幾列.
 * 
 * @author zhangk
 *
 */
public class CommonRcFileCompression extends Configured implements Tool, DataStatic {
    
    private static final String SRCPATH = "srcpath";
    private static final String RESPATH = "respath";
    private static final String SEP = "sep"; //分隔符
    private static final String COLSUM = "colsum"; //共幾列
    
    public static void main(String[] args) throws Exception {
        
        int exitcode = ToolRunner.run(new CommonRcFileCompression(), args);
        System.exit(exitcode);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        int code = 0;
        
        Options options = new Options();
        options.addOption(DATE, true, DATE);
        options.addOption(SRCPATH, true, SRCPATH);
        options.addOption(RESPATH, true, RESPATH);
        options.addOption(SEP, true, "eg: 001");
        options.addOption(COLSUM, true, "eg:column sum");
        options.addOption(REDUCENUM, true, "eg:100");
        CommandLineParser parser = new GnuParser();
        HelpFormatter helper = new HelpFormatter();
        CommandLine line = null;
        try {
            line = parser.parse(options, args);
            if (!(line.hasOption(DATE) 
                    && line.hasOption(SRCPATH)
                    && line.hasOption(RESPATH)
                    && line.hasOption(SEP)
                    && line.hasOption(COLSUM)
                    && line.hasOption(REDUCENUM))
                    || "".equals(line.getOptionValue(DATE))
                    || "".equals(line.getOptionValue(SRCPATH))
                    || "".equals(line.getOptionValue(RESPATH))
                    || "".equals(line.getOptionValue(SEP))
                    || "".equals(line.getOptionValue(COLSUM))
                    || "".equals(line.getOptionValue(REDUCENUM))) {
                helper.printHelp("DisttibuteOrder", options);
                return 1;
            }
        } catch (Exception e) {
            helper.printHelp("DisttibuteOrder", options);
            e.printStackTrace();
        }

        String srcpath = line.getOptionValue(SRCPATH);
        String sep = line.getOptionValue(SEP);
        String colsum = line.getOptionValue(COLSUM);
        String respath = line.getOptionValue(RESPATH);
        String reducenum = line.getOptionValue(REDUCENUM);
        
        if (code == 0) {
            Job job = new Job();
            Configuration conf = job.getConfiguration();
            RCFileOutputFormat.setColumnNumber(conf, Tools.stringToInteger(colsum, 0));
            job.setJarByClass(CommonRcFileCompression.class);
            
            FileInputFormat.setInputPaths(job, new Path(srcpath));
            RCFileOutputFormat.setOutputPath(job, new Path(respath));

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(RCFileOutputFormat.class);

            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(BytesRefArrayWritable.class);
            job.setNumReduceTasks(Integer.parseInt(reducenum));
            job.setMapperClass(CommonMapper.class);

            conf.set("sep", sep);
            conf.set("colsum", colsum);
            conf.setBoolean("mapred.output.compress", true);
            conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");

            code = (job.waitForCompletion(true)) ? 0 : 1;
        }
        
        return code;
    }
}



package xxx.mapper;

import java.io.IOException;

import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.tool.DataStatic;
import com.tool.Tools;

/**
 * map讀入.
 * @author zhangk
 *
 */
public class CommonMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> implements DataStatic {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String sep = context.getConfiguration().get("sep");
        Integer colsum = Tools.stringToInteger(context.getConfiguration().get("colsum"), 0);
        String line = value.toString();
        if (!line.equals("")) {
            String[] lines = line.split(sep, NEGATIVE_ONE);
            if (lines.length >= colsum) {

                byte[][] record = makeByte(lines, colsum);

                BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);

                for (int i = 0; i < record.length; i++) {
                    BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
                    bytes.set(i, cu);
                }
                context.write(key, bytes);
            }
        }
    }

    /**
     * 将行数据转换到byte数组中.
     * 
     * @param lines
     * @return
     */
    public static byte[][] makeByte(String[] lines, Integer colsum) {
        byte[][] record = new byte[colsum][colsum];
        for (int i = 0; i < colsum; i++) {
            record[i] = lines[i].getBytes();
        }
        return record;
    }
}

将工程打包命名:commonPress.jar
调用例子:
hadoop jar  commonPress.jar  -date=20120504 -srcpath=/xxx/xx/xx/2012/05/04/ -respath=/xxx/xx/xx/2012/05/04_1 -colsum=4 -sep=\\t -reducenum=40

参数:
日期
输入路径
输出路径
sep是分隔符
colsum是按照sep分隔后一共几列
reducenum是reduce的数量
相关标签: commons tool