欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreduce编程模型之HDFS数据到HBASE表数据

程序员文章站 2022-03-31 18:16:45
...
package com.bfd.util;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class CopyOfGidAddCartTemp {
	public static final String TABLE_NAME = "_AddCart_TEMP"; 
	public static final String COLUMN_FAMILY = "ci";
	private static Configuration conf = null;

	static {
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM);
		conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT);

	}

	static class Mapper
			extends
			org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ImmutableBytesWritable, LongWritable> {
		private ImmutableBytesWritable outKey = new ImmutableBytesWritable();
		private LongWritable outValue = new LongWritable();

		@Override
		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ImmutableBytesWritable, LongWritable>.Context context)
				throws IOException, InterruptedException {
					context.write(new ImmutableBytesWritable(), new LongWritable());
		}

	}

	static class Reducer
			extends
			org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {

		public void reduce(ImmutableBytesWritable key,
				Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			context.write(key,new KeyValue());
		}

	}

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {

		Configuration conf = new Configuration();
		Job job = new Job(conf, "_AddCart_TEMP");

		job.setJarByClass(CopyOfGidAddCartTemp.class);

		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(LongWritable.class);

		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(KeyValue.class);

		job.setMapperClass(com.bfd.util.CopyOfGidAddCartTemp.Mapper.class);
		job.setReducerClass(com.bfd.util.CopyOfGidAddCartTemp.Reducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(HFileOutputFormat.class);

		job.setNumReduceTasks(4);
		/* 本地执行 */
		// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

		TextInputFormat.setInputPaths(job, Const.HDFS_BASE_INPUT + "/l_date="
				+ args[0] + "/*");
		HFileOutputFormat.setOutputPath(job, new Path(Const.HDFS_BASE_OUTPUT
				+ "/addcart"));

		Configuration HBASE_CONFIG = new Configuration();
		HBASE_CONFIG.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM);
		HBASE_CONFIG.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT);
		HBASE_CONFIG.set("date2", args[0]);
		Configuration cfg = HBaseConfiguration.create(HBASE_CONFIG);
		HTable htable = new HTable(cfg, TABLE_NAME);
		HFileOutputFormat.configureIncrementalLoad(job, htable);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}