欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于mapreduce 读文件直接入 hbase

程序员文章站 2022-03-31 18:18:16
...
hbase(main):002:0> describe 'tab1'
DESCRIPTION                                                                                                 ENABLED                                                    
 {NAME => 'tab1', FAMILIES => [{NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICA true                                                       
 TION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_                                                            
 DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE                                                            
  => 'true'}]}  

  

 

package com.bfd.hbase_mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class TxtHbase {
    public static void main(String [] args) throws Exception{
        int mr;
        mr = ToolRunner.run(new Configuration(),new THDriver(),args);
        System.exit(mr);
    }
}

 

package com.bfd.hbase_mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

public class THDriver extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181");  //千万别忘记配置
        
        Job job = new Job(conf,"Txt-to-Hbase");
        job.setJarByClass(TxtHbase.class);
        
        Path in = new Path("hdfs://bfdbjc1:12000/user/work/a.txt");
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, in);
        
        job.setMapperClass(THMapper.class);
        job.setReducerClass(THReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job);
        
       job.waitForCompletion(true);
       return 0;
    }
    
}

 

 

package com.bfd.hbase_mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class THMapper extends Mapper<LongWritable,Text,Text,Text>{
        public void map(LongWritable key,Text value,Context context){
            String[] items = value.toString().split(" ");
            String k = items[0];
            String v = items[1];
            System.out.println("key:"+k+","+"value:"+v);
            try {
                context.write(new Text(k), new Text(v));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

    }

}

 

package com.bfd.hbase_mr;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;

public class THReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
    public void reduce(Text key,Iterable<Text> value,Context context){
        String k = key.toString();
        String v = value.iterator().next().toString(); //由数据知道value就只有一行
        Put putrow = new Put(k.getBytes());
        putrow.add("f1".getBytes(), "qualifier".getBytes(), v.getBytes());
        try {
            
            context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
}

 

数据:

name1183 value1183
name1184 value1184
name1185 value1185
name1186 value1186
name1187 value1187
name1188 value1188
name1189 value1189
name1190 value1190
name1191 value1191
name1192 value1192
name1193 value1193
name1194 value1194
name1195 value1195
name1196 value1196
name1197 value1197
name1198 value1198
name1199 value1199