mapreduce-自定义outputformat
程序员文章站
2022-06-30 09:10:18
...
图片加载不了看这
常见的outputformat实现类
使用场景:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lPyRQCi4-1605151404282)(https://s1.ax1x.com/2020/11/02/BrGyK1.png)]
实例操作:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MmT3uXeL-1605151404284)(https://s1.ax1x.com/2020/11/02/BrGqVf.png)]
Mapper:
package com.out.io;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class outMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String k = value.toString() + "\n";
Text kk = new Text();
kk.set(k);
context.write(kk,NullWritable.get());
}
}
Reducer:
package com.out.io;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class outReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable i:values) {
context.write(key,NullWritable.get());
}
}
}
outformat:
package com.out.io;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.FilterOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//import javax.xml.soap.Text;
public class outformat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new FilterRecord(taskAttemptContext);
}
}
文件读写类,实现主要逻辑:RecordWriter
package com.out.io;
//import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class FilterRecord extends RecordWriter<Text, NullWritable> {
FSDataOutputStream fosatguigu = null;
FSDataOutputStream fosother = null;
public FilterRecord(TaskAttemptContext job) {
FileSystem fs;
try{
fs = FileSystem.get(job.getConfiguration());
fosatguigu = fs.create(new Path("d:/mapreduceoutput/out5/atguidu.log"));
fosother = fs.create(new Path("d:/mapreduceoutput/out5/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
if (text.toString().contains("atguigu")){
fosatguigu.write(text.toString().getBytes());
} else {
fosother.write(text.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//fosatguigu.close();
//fosother.close();
IOUtils.closeStream(fosatguigu);
IOUtils.closeStream(fosother);
}
}
Driver:
package com.out.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class outDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"d:/mapreduceinput/input5","d:/mapreduceoutput/out5"};
Job job = Job.getInstance(new Configuration());
job.setJarByClass(outDriver.class);
job.setMapperClass(outMapper.class);
job.setReducerClass(outReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(outformat.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
上一篇: 用PS制作1寸证件照方法介绍
下一篇: 用ps简单制作一个翻页的效果