欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop自定义OutputFormat和InputFormat

程序员文章站 2022-07-14 16:42:31
...

自定义OutPutFormat

Hadoop自定义OutputFormat和InputFormat

相关知识点如图所示

要自定义OutPutFormat,我们要做

1 先写MR两个类
2 FilterOutPutFormat extends FileOutPutFormat 
  重写RecordWriter方法

public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable> {
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //此处要传入job的上下文以便创建IO流
        return new FilterRecordWriter(taskAttemptContext);
    }
}
3 FRecorfWritter extends RecordWriter

在构造方法中就要初始化输出流
重写writer方法和close方法
主要的逻辑执行在writer中

public class FilterRecordWriter extends RecordWriter<Text,NullWritable> {

    FSDataOutputStream out1=null;  //创建输出流1
    FSDataOutputStream out2=null;  //创建输出流2

    public FilterRecordWriter(TaskAttemptContext job){
        FileSystem fs=null;//通过job获得文件系统
        try {
            fs=FileSystem.get(job.getConfiguration());
            //指定路径
            Path out1Path=new Path("F:/Test/baidu.txt");
            Path out2Path=new Path("F:/Test/other.txt");

            //创建输出流
            out1=fs.create(out1Path);
            out2=fs.create(out2Path);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {

        if(text.toString().contains("baidu")){
            out1.write(text.getBytes());
        }else {
            out2.write(text.getBytes());
        }
    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        IOUtils.closeStream(out1);
        IOUtils.closeStream(out2);
    }
}
4 设置驱动类


// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);

这便完成了自定义输出类

 

Hadoop自定义OutputFormat和InputFormat

自定义InputFormat

需要完成以下步骤

1 自定义类来继承FileInputFormat
    要重写里面的isSplitable()方法,这个是用来切片的,返回false就不切片了
    重写createRecordReader()方法


public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	@Override
	public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)	throws IOException, InterruptedException {
		
		WholeRecordReader recordReader = new WholeRecordReader();
        //调用初始化,里面传入切片信息和上下文信息
		recordReader.initialize(split, context);
		
		return recordReader;
	}
}
2 写RecordReader类

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

	private Configuration configuration;
	private FileSplit split;
	
	private boolean isProgress= true;
	private BytesWritable value = new BytesWritable();
	private Text k = new Text();

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		
		this.split = (FileSplit)split;
		configuration = context.getConfiguration();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if (isProgress) {

			// 1 定义缓存区
			byte[] contents = new byte[(int)split.getLength()];
			
			FileSystem fs = null;
			FSDataInputStream fis = null;
			
			try {
				// 2 获取文件系统
				Path path = split.getPath();
				fs = path.getFileSystem(configuration);
				
				// 3 读取数据
				fis = fs.open(path);
				
				// 4 读取文件内容
				IOUtils.readFully(fis, contents, 0, contents.length);
				
				// 5 输出文件内容
				value.set(contents, 0, contents.length);

                // 6 获取文件路径及名称
                String name = split.getPath().toString();

                // 7 设置输出的key值
                k.set(name);

			} catch (Exception e) {
				
			}finally {
				IOUtils.closeStream(fis);
			}
			
			isProgress = false;
			
			return true;
		}
		
		return false;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return k;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0;
	}

	@Override
	public void close() throws IOException {
	}
}
3 MR类
4 设置输入的inputFormat		
job.setInputFormatClass(WholeFileInputformat.class);

 

就用这样的方法就自定义了InputFormat和OutPutFormat

 

 

 

相关标签: 大数据 Hadoop