mapreduce编程模型之hbase输入hdfs多路输出
程序员文章站
2022-03-31 18:11:01
...
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import com.bfd.util.Const; public class IPCount { static class MyMapper extends TableMapper<Text, Text> { @Override public void map(ImmutableBytesWritable row, Result value,Context context) throws IOException, InterruptedException { for (KeyValue kv : value.raw()) { val = new String(kv.getValue(),"UTF-8"); qualifier = new String(kv.getQualifier()); if(qualifier.indexOf(">brand")==-1){ context.write(gid, outVal); } } } } static class MyReducer extends Reducer<Text, Text, Text, Text> { @SuppressWarnings("rawtypes") private MultipleOutputs multipleOutputs; protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs =new MultipleOutputs<Text,Text>(context); } protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @SuppressWarnings("unchecked") public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { if(isTrue){ multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"active_normal"); }else{ multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"nonactive_normal"); } } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM); conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT); Job job = new Job(conf, "IPCount"); job.setJarByClass(IPCount.class); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(args[0],scan,MyMapper.class,Text.class,Text.class,job); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(10); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }