基于mapreduce 读文件直接入 hbase
程序员文章站
2022-03-31 18:18:16
...
hbase(main):002:0> describe 'tab1' DESCRIPTION ENABLED {NAME => 'tab1', FAMILIES => [{NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICA true TION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_ DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}
package com.bfd.hbase_mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; public class TxtHbase { public static void main(String [] args) throws Exception{ int mr; mr = ToolRunner.run(new Configuration(),new THDriver(),args); System.exit(mr); } }
package com.bfd.hbase_mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; public class THDriver extends Configured implements Tool{ @Override public int run(String[] arg0) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", "bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181"); //千万别忘记配置 Job job = new Job(conf,"Txt-to-Hbase"); job.setJarByClass(TxtHbase.class); Path in = new Path("hdfs://bfdbjc1:12000/user/work/a.txt"); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, in); job.setMapperClass(THMapper.class); job.setReducerClass(THReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); job.waitForCompletion(true); return 0; } }
package com.bfd.hbase_mr; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class THMapper extends Mapper<LongWritable,Text,Text,Text>{ public void map(LongWritable key,Text value,Context context){ String[] items = value.toString().split(" "); String k = items[0]; String v = items[1]; System.out.println("key:"+k+","+"value:"+v); try { context.write(new Text(k), new Text(v)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package com.bfd.hbase_mr; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; public class THReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{ public void reduce(Text key,Iterable<Text> value,Context context){ String k = key.toString(); String v = value.iterator().next().toString(); //由数据知道value就只有一行 Put putrow = new Put(k.getBytes()); putrow.add("f1".getBytes(), "qualifier".getBytes(), v.getBytes()); try { context.write(new ImmutableBytesWritable(key.getBytes()), putrow); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
数据:
name1183 value1183
name1184 value1184
name1185 value1185
name1186 value1186
name1187 value1187
name1188 value1188
name1189 value1189
name1190 value1190
name1191 value1191
name1192 value1192
name1193 value1193
name1194 value1194
name1195 value1195
name1196 value1196
name1197 value1197
name1198 value1198
name1199 value1199
上一篇: 我有部旧手机在闲鱼上买了
下一篇: 刚做完月子不久那会儿