Hadoop自定义OutputFormat和InputFormat
程序员文章站
2022-07-14 16:42:31
...
自定义OutPutFormat
相关知识点如图所示
要自定义OutPutFormat,我们要做
1 先写MR两个类
2 FilterOutPutFormat extends FileOutPutFormat
重写RecordWriter方法
public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable> {
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//此处要传入job的上下文以便创建IO流
return new FilterRecordWriter(taskAttemptContext);
}
}
3 FRecorfWritter extends RecordWriter
在构造方法中就要初始化输出流
重写writer方法和close方法
主要的逻辑执行在writer中
public class FilterRecordWriter extends RecordWriter<Text,NullWritable> {
FSDataOutputStream out1=null; //创建输出流1
FSDataOutputStream out2=null; //创建输出流2
public FilterRecordWriter(TaskAttemptContext job){
FileSystem fs=null;//通过job获得文件系统
try {
fs=FileSystem.get(job.getConfiguration());
//指定路径
Path out1Path=new Path("F:/Test/baidu.txt");
Path out2Path=new Path("F:/Test/other.txt");
//创建输出流
out1=fs.create(out1Path);
out2=fs.create(out2Path);
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
if(text.toString().contains("baidu")){
out1.write(text.getBytes());
}else {
out2.write(text.getBytes());
}
}
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(out1);
IOUtils.closeStream(out2);
}
}
4 设置驱动类
// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);
这便完成了自定义输出类
自定义InputFormat
需要完成以下步骤
1 自定义类来继承FileInputFormat
要重写里面的isSplitable()方法,这个是用来切片的,返回false就不切片了
重写createRecordReader()方法
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
//调用初始化,里面传入切片信息和上下文信息
recordReader.initialize(split, context);
return recordReader;
}
}
2 写RecordReader类
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
private Configuration configuration;
private FileSplit split;
private boolean isProgress= true;
private BytesWritable value = new BytesWritable();
private Text k = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit)split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath();
fs = path.getFileSystem(configuration);
// 3 读取数据
fis = fs.open(path);
// 4 读取文件内容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 输出文件内容
value.set(contents, 0, contents.length);
// 6 获取文件路径及名称
String name = split.getPath().toString();
// 7 设置输出的key值
k.set(name);
} catch (Exception e) {
}finally {
IOUtils.closeStream(fis);
}
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
3 MR类
4 设置输入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
就用这样的方法就自定义了InputFormat和OutPutFormat