mapreudce 通过读取hbase表删除hbase 数据
程序员文章站
2022-03-31 18:17:21
...
package foo.bar.MR; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import foo.bar.validate.Const; public class DropRowByTimeStampMapReduce { public static Configuration configuration; public static List<String> rowkeyList = new ArrayList<String>(); public static List<String> qualifierList = new ArrayList<String>(); static { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum",Const.ZOOKEEPER_QUORAM); configuration.set("hbase.rootdir", Const.HBASE_ROOTDIR); } static class MyMapper extends TableMapper<Text, LongWritable> { public void map(ImmutableBytesWritable row, Result r, Context context) throws InterruptedException, IOException { String tableName = context.getConfiguration().get("tableName"); HTable htbl = new HTable(configuration, tableName); List<Delete> lists = new ArrayList<Delete>(); for (KeyValue kv : r.raw()) { Delete dlt = new Delete(kv.getRow()); dlt.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp()); lists.add(dlt); System.out.println("delete-- gv:"+Bytes.toString(kv.getRow())+",family:"+Bytes.toString(kv.getFamily())+",qualifier:"+Bytes.toString(kv.getQualifier())+",timestamp:"+kv.getTimestamp()); } htbl.delete(lists); htbl.flushCommits(); htbl.close(); } } public static void main(String[] args) throws Exception { if(args.length!=2){ return ; } String tableName = args[0]; String timeStamp = args[1]; Configuration config = HBaseConfiguration.create(); config.set("tableName", tableName); Job job = new Job(config, "ExampleRead"); job.setJarByClass(DropRowByTimeStamp.class); // class that contains mapper Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs scan.setTimeStamp(new Long(timeStamp)); TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper null, // mapper output key null, // mapper output value job); job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
上一篇: 记得小时候家里很少有什么好吃的
下一篇: 表弟最近拼了血本