欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreduce编程模型之hbase输入hdfs多路输出

程序员文章站 2022-03-31 18:11:01
...
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import com.bfd.util.Const;


public class IPCount {
	
	static class MyMapper extends TableMapper<Text, Text> {
		@Override
		public void map(ImmutableBytesWritable row, Result value,Context context) throws IOException, InterruptedException {
			  
			for (KeyValue kv : value.raw()) {
				val = new String(kv.getValue(),"UTF-8");
				qualifier = new String(kv.getQualifier());
				if(qualifier.indexOf(">brand")==-1){
					context.write(gid, outVal);
				}
			}
		}
	}
	
	static class MyReducer extends Reducer<Text, Text, Text, Text> {
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs; 
		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,Text>(context);
		}
		
		protected void cleanup(Context context) throws IOException, InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
			if(isTrue){
				multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"active_normal");
			}else{
				multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"nonactive_normal");
			}
		}			
    }
	
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM);
		conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT);
		Job job = new Job(conf, "IPCount");
		job.setJarByClass(IPCount.class);
		Scan scan = new Scan();
		scan.setCaching(500);
		scan.setCacheBlocks(false);
		TableMapReduceUtil.initTableMapperJob(args[0],scan,MyMapper.class,Text.class,Text.class,job);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(10);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}