欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreudce 通过读取hbase表删除hbase 数据

程序员文章站 2022-03-31 18:17:21
...
package foo.bar.MR;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import foo.bar.validate.Const;

public class DropRowByTimeStampMapReduce  {
	
	   public static Configuration configuration;
	   public static List<String> rowkeyList = new ArrayList<String>();
	   public static List<String> qualifierList = new ArrayList<String>();
	   static {  
	        configuration = HBaseConfiguration.create(); 
	        configuration.set("hbase.zookeeper.quorum",Const.ZOOKEEPER_QUORAM);  
	        configuration.set("hbase.rootdir", Const.HBASE_ROOTDIR);  
	    }  
	    
	static class MyMapper extends TableMapper<Text, LongWritable> {
		public void map(ImmutableBytesWritable row, Result r, Context context)
				throws InterruptedException, IOException {
			String tableName = context.getConfiguration().get("tableName");
			HTable htbl = new HTable(configuration, tableName);
			List<Delete> lists = new ArrayList<Delete>();
			for (KeyValue kv : r.raw()) {
				Delete dlt = new Delete(kv.getRow());
				dlt.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());
				lists.add(dlt);
				System.out.println("delete-- gv:"+Bytes.toString(kv.getRow())+",family:"+Bytes.toString(kv.getFamily())+",qualifier:"+Bytes.toString(kv.getQualifier())+",timestamp:"+kv.getTimestamp());
			}
			htbl.delete(lists);
			htbl.flushCommits();
			htbl.close();
		}
	}
		   
	   
	    public static void main(String[] args) throws Exception {  
	    	if(args.length!=2){
	    		return ;
	    	}
	    	String tableName = args[0];
	    	String timeStamp = args[1];
	    	Configuration config = HBaseConfiguration.create();
	    	config.set("tableName", tableName);
	    	Job job = new Job(config, "ExampleRead");
	    	job.setJarByClass(DropRowByTimeStamp.class);     // class that contains mapper
	    		
	    	Scan scan = new Scan();
	    	scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
	    	scan.setCacheBlocks(false);  // don't set to true for MR jobs
	        scan.setTimeStamp(new Long(timeStamp));
	    	  
	    	TableMapReduceUtil.initTableMapperJob(
	    	  tableName,        // input HBase table name
	    	  scan,             // Scan instance to control CF and attribute selection
	    	  MyMapper.class,   // mapper
	    	  null,             // mapper output key 
	    	  null,             // mapper output value
	    	  job);
	    	job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
	    		    
	    	boolean b = job.waitForCompletion(true);
	    	if (!b) {
	    	  throw new IOException("error with job!");
	    	}
	    }  
	    
	   
}