mapreduce编程模型之HDFS数据到HBASE表数据
程序员文章站
2022-03-31 18:10:49
...
package com.bfd.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class CopyOfGidAddCartTemp { public static final String TABLE_NAME = "_AddCart_TEMP"; public static final String COLUMN_FAMILY = "ci"; private static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM); conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT); } static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ImmutableBytesWritable, LongWritable> { private ImmutableBytesWritable outKey = new ImmutableBytesWritable(); private LongWritable outValue = new LongWritable(); @Override protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ImmutableBytesWritable, LongWritable>.Context context) throws IOException, InterruptedException { context.write(new ImmutableBytesWritable(), new LongWritable()); } } static class Reducer extends org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> { public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(key,new KeyValue()); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "_AddCart_TEMP"); job.setJarByClass(CopyOfGidAddCartTemp.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(com.bfd.util.CopyOfGidAddCartTemp.Mapper.class); job.setReducerClass(com.bfd.util.CopyOfGidAddCartTemp.Reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); job.setNumReduceTasks(4); /* 本地执行 */ // ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); TextInputFormat.setInputPaths(job, Const.HDFS_BASE_INPUT + "/l_date=" + args[0] + "/*"); HFileOutputFormat.setOutputPath(job, new Path(Const.HDFS_BASE_OUTPUT + "/addcart")); Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM); HBASE_CONFIG.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT); HBASE_CONFIG.set("date2", args[0]); Configuration cfg = HBaseConfiguration.create(HBASE_CONFIG); HTable htable = new HTable(cfg, TABLE_NAME); HFileOutputFormat.configureIncrementalLoad(job, htable); System.exit(job.waitForCompletion(true) ? 0 : 1); } }