hadoop mr file2hfile2hbase
程序员文章站
2024-01-23 16:35:46
...
写了个简单的mr 操作file到hfile,在把hfile倒入hbase的例子,在此记录一下:
File2HFile2HBase.java代码:
package com.lyq.study.example; import java.io.IOException; import java.security.PrivilegedAction; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import com.lyq.study.lib.HFileOutputFormatBase; import com.lyq.study.util.HBaseConfigUtils; public class File2HFile2HBase { private static final Log LOG = LogFactory.getLog(File2HFile2HBase.class); private String tableName = "testtable1"; private static class MapperClass extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { public byte[] family = Bytes.toBytes("info"); public String[] columns = { "card", "type", "amount", "time", "many" }; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] columnVals = value.toString().split(","); String rowkey = columnVals[0] + columnVals[3]; Put put = new Put(Bytes.toBytes(rowkey)); for (int i = 0; i < columnVals.length; i++) { put.add(family, Bytes.toBytes(columns[i]), Bytes.toBytes(columnVals[i])); } context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put); } } public int run(final String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); return -1; } LOG.info("Usage command args: " + Arrays.toString(args)); final String hfile = args[1]; UserGroupInformation ugi = UserGroupInformation .createRemoteUser("hadoop"); ugi.doAs(new PrivilegedAction<Void>() { @Override public Void run() { try { // 1.获取Configuration Configuration conf = HBaseConfigUtils.getHBaseConfig(1); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); // 2.获取Job Job job = getJob(conf, args); // 3.执行Job if (!job.waitForCompletion(true)) { throw new IOException("【初始化数据】失败"); } // 4.把hfile文件导入到HBase LoadIncrementalHFiles loader = new LoadIncrementalHFiles( conf); HTable htable = new HTable(conf, tableName); loader.doBulkLoad(new Path(hfile), htable); // 5.清理hfile目录 deleteByDir(conf, new Path(hfile), true); } catch (IOException e) { // TODO: handle exception } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }); return 0; } private Job getJob(Configuration conf, String[] args) throws IOException { Job job = Job.getInstance(conf); job.setJobName("File2HFile"); job.setJarByClass(File2HFile2HBase.class); job.setMapperClass(MapperClass.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); HTable htable = new HTable(conf, tableName); HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class); return job; } public static boolean deleteByDir(Configuration conf, Path path, Boolean recursive) throws IOException { FileSystem fs = FileSystem.get(conf); boolean success = fs.delete(path, recursive); LOG.info("删除[" + path + "]成功? " + success); return success; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://master129:9000/test/input/data.txt", "hdfs://master129:9000/test/output" }; File2HFile2HBase f2hf2hb = new File2HFile2HBase(); System.exit(f2hf2hb.run(args)); } }
HFileOutputFormatBase.java代码:
/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.lyq.study.lib; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.PutSortReducer; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TextSortReducer; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; /** * Writes HFiles. Passed Cells must arrive in order. Writes current time as the * sequence id for the file. Sets the major compacted attribute on created * hfiles. Calling write(null,null) will forceably roll all HFiles being * written. * <p> * Using this class as part of a MapReduce job is best done using * {@link #configureIncrementalLoad(Job, HTable)}. * * @see KeyValueSortReducer */ @InterfaceAudience.Public @InterfaceStability.Stable public class HFileOutputFormatBase extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {// 把Cell改成了KeyValue static Log LOG = LogFactory.getLog(HFileOutputFormatBase.class); static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; public static final String _deleteRowkey = "_deleteRowkey";// 新增了该行 public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(// 把cell改成了KeyValue //去掉了static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter() //把其内容直接拿到了getRecordWriter()中 final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context) .getWorkPath(); final Path ignoreOutputPath = getDeleteRowKeyFile(outputPath);// 新增了该行 final Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); // Invented config. Add to hbase-*.xml if other than default // compression. final String defaultCompression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); // create a map from column family to the compression algorithm final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf); final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf); final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); final HFileDataBlockEncoder encoder; if (dataBlockEncodingStr == null) { encoder = NoOpDataBlockEncoder.INSTANCE; } else { try { encoder = new HFileDataBlockEncoderImpl( DataBlockEncoding.valueOf(dataBlockEncodingStr)); } catch (IllegalArgumentException ex) { throw new RuntimeException( "Invalid data block encoding type configured for the param " + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr); } } return new RecordWriter<ImmutableBytesWritable, KeyValue>() {// 把V改成了KeyValue // Map of families to writers and how much has been output on the // writer. private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>( Bytes.BYTES_COMPARATOR); private final FSDataOutputStream dos = fs.create(ignoreOutputPath); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte[] now = Bytes .toBytes(System.currentTimeMillis()); private boolean rollRequested = false; public void write(ImmutableBytesWritable row, KeyValue kv)// 把V cell改成了KeyValue kv throws IOException { // KeyValue kv = KeyValueUtil.ensureKeyValue(cell);//注释掉了该行 // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); return; } byte[] rowKey = kv.getRow(); long length = kv.getLength(); byte[] family = kv.getFamily(); if (ignore(kv)) {// 新增了该if条件判断 byte[] readBuf = rowKey; dos.write(readBuf, 0, readBuf.length); dos.write(Bytes.toBytes("\n")); return; } WriterLength wl = this.writers.get(family); // If this is a new column family, verify that the directory // exists if (wl == null) { fs.mkdirs(new Path(outputdir, Bytes.toString(family))); } // If any of the HFiles for the column families has reached // maxsize, we need to roll all the writers if (wl != null && wl.written + length >= maxsize) { this.rollRequested = true; } // This can only happen once a row is finished though if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { rollWriters(); } // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { wl = getNewWriter(family, conf); } // we now have the proper HLog writer. full steam ahead kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = rowKey; } private void rollWriters() throws IOException { for (WriterLength wl : this.writers.values()) { if (wl.writer != null) { LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); close(wl.writer); } wl.writer = null; wl.written = 0; } this.rollRequested = false; } /* * Create a new StoreFile.Writer. * * @param family * * @return A WriterLength, containing a new StoreFile.Writer. * * @throws IOException */ private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); String compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; String bloomTypeStr = bloomTypeMap.get(family); BloomType bloomType = BloomType.NONE; if (bloomTypeStr != null) { bloomType = BloomType.valueOf(bloomTypeStr); } String blockSizeString = blockSizeMap.get(family); int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE : Integer.parseInt(blockSizeString); Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig( tempConf), fs, blockSize) .withOutputDir(familydir) .withCompression( AbstractHFileWriter .compressionByName(compression)) .withBloomType(bloomType) .withComparator(KeyValue.COMPARATOR) .withDataBlockEncoder(encoder) .withChecksumType(HStore.getChecksumType(conf)) .withBytesPerChecksum(HStore.getBytesPerChecksum(conf)) .build(); this.writers.put(family, wl); return wl; } private void close(final StoreFile.Writer w) throws IOException { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes .toBytes(context.getTaskAttemptID().toString())); w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); w.appendFileInfo( StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); w.appendTrackedTimestampsToMetadata(); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { dos.flush();// 新增了该行 dos.close();// 新增了该行 for (WriterLength wl : this.writers.values()) { close(wl.writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; StoreFile.Writer writer = null; } /** * Return the start keys of all of the regions in this table, as a list of * ImmutableBytesWritable. */ private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table) throws IOException { byte[][] byteKeys = table.getStartKeys(); ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>( byteKeys.length); for (byte[] byteKey : byteKeys) { ret.add(new ImmutableBytesWritable(byteKey)); } return ret; } /** * Write out a {@link SequenceFile} that can be read by * {@link TotalOrderPartitioner} that contains the split points in * startKeys. */ @SuppressWarnings("deprecation") private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); } // We're generating a list of split points, and we don't ever // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>( startKeys); ImmutableBytesWritable first = sorted.first(); if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { throw new IllegalArgumentException( "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } sorted.remove(first); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { writer.append(startKey, NullWritable.get()); } } finally { writer.close(); } } /** * Configure a MapReduce Job to perform an incremental load into the given * table. This * <ul> * <li>Inspects the table to configure a total order partitioner</li> * <li>Uploads the partitions file to the cluster and adds it to the * DistributedCache</li> * <li>Sets the number of reduce tasks to match the current number of * regions</li> * <li>Sets the output key/value class to match HFileOutputFormat2's * requirements</li> * <li>Sets the reducer up to perform the appropriate sorting (either * KeyValueSortReducer or PutSortReducer)</li> * </ul> * The user should be sure to set the map output value class to either * KeyValue or Put before running this function. */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { configureIncrementalLoad(job, table, HFileOutputFormatBase.class); } public static void configureIncrementalLoad(Job job, HTable table, Class<? extends OutputFormat<?, ?>> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormatBase.class); // Based on the configured map output class, set the correct reducer to // properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); configureBlockSize(table, conf); // TableMapReduceUtil.addDependencyJars(job);// 注释掉了该行 TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); } private static void configureBlockSize(HTable table, Configuration conf) throws IOException { StringBuilder blockSizeConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { blockSizeConfigValue.append('&'); } blockSizeConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); blockSizeConfigValue.append('='); blockSizeConfigValue.append(URLEncoder.encode( String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); } // Get rid of the last ampersand conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); } /** * Run inside the task to deserialize column family to compression algorithm * map from the configuration. * * Package-private for unit tests only. * * @return a map from column family to the name of the configured * compression algorithm */ static Map<byte[], String> createFamilyCompressionMap(Configuration conf) { return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); } private static Map<byte[], String> createFamilyBloomMap(Configuration conf) { return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); } private static Map<byte[], String> createFamilyBlockSizeMap( Configuration conf) { return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); } /** * Run inside the task to deserialize column family to given conf value map. * * @param conf * @param confName * @return a map of column family to the given configuration value */ private static Map<byte[], String> createFamilyConfValueMap( Configuration conf, String confName) { Map<byte[], String> confValMap = new TreeMap<byte[], String>( Bytes.BYTES_COMPARATOR); String confVal = conf.get(confName, ""); for (String familyConf : confVal.split("&")) { String[] familySplit = familyConf.split("="); if (familySplit.length != 2) { continue; } try { confValMap .put(URLDecoder.decode(familySplit[0], "UTF-8") .getBytes(), URLDecoder.decode(familySplit[1], "UTF-8")); } catch (UnsupportedEncodingException e) { // will not happen with UTF-8 encoding throw new AssertionError(e); } } return confValMap; } /** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning * against <code>splitPoints</code>. Cleans up the partitions file after job * exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { // create the partitions file FileSystem fs = FileSystem.get(job.getConfiguration()); Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); fs.deleteOnExit(partitionsPath); writePartitions(job.getConfiguration(), partitionsPath, splitPoints); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); } /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. * * Package-private for unit tests only. * * @throws IOException * on failure to read column family descriptors */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") static void configureCompression(HTable table, Configuration conf) throws IOException { StringBuilder compressionConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { compressionConfigValue.append('&'); } compressionConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); compressionConfigValue.append('='); compressionConfigValue.append(URLEncoder.encode(familyDescriptor .getCompression().getName(), "UTF-8")); } // Get rid of the last ampersand conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } /** * Serialize column family to bloom type map to configuration. Invoked while * configuring the MR job for incremental load. * * @throws IOException * on failure to read column family descriptors */ static void configureBloomType(HTable table, Configuration conf) throws IOException { HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } StringBuilder bloomTypeConfigValue = new StringBuilder(); Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { bloomTypeConfigValue.append('&'); } bloomTypeConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); bloomTypeConfigValue.append('='); String bloomType = familyDescriptor.getBloomFilterType().toString(); if (bloomType == null) { bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; } bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); } conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); } // 新增了ignore() @SuppressWarnings("deprecation") public boolean ignore(KeyValue kv) { boolean ignore = Bytes.toString(kv.getValue()).indexOf("Del") >= 0; return ignore; } // 新增了getDeleteRowKeyPath() public static Path getDeleteRowKeyPath(Path outputPath) { return new Path(outputPath + HFileOutputFormatBase._deleteRowkey); } // 新增了getDeleteRowKeyFile() public static Path getDeleteRowKeyFile(Path outputPath) { return new Path(getDeleteRowKeyPath(outputPath) + "/" + UUID.randomUUID().toString()); } }
data.txt文件的内容如下:
6222020405006,typeA,100000,201408081225,2000 6222020405006,typeA,100000,201408112351,1000 6222020405006,typeA,100000,201408140739,4000 6222020405008,typeB,50000,201408150932,5000 6222020405009,typeC,30000,201408181212,10000
在这里解说一下:HFileOutputFormatBase.java是重写了hbase-server-0.96.2-hadoop2.jar里面的HFileOutputFormat2.java文件,在File2HFile2HBase.java 里方法 getJob()里return前一行:
HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class);
刚开始写了HFileOutputFormat.configureIncrementalLoad(job, htable);但是老是报如下错误:
2014-08-26 22:31:47,183 INFO [main] example.File2HFile2HBase (File2HFile2HBase.java:run(61)) - Usage command args: [hdfs://master129:9000/test/input/data.txt, hdfs://master129:9000/test/output] SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2014-08-26 22:31:51,256 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=LiuYQ-PC 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.6.0_45 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Sun Microsystems Inc. 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=C:\Program Files\Java\jdk1.6.0_45\jre 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=E:\workspace\.metadata\.plugins\org.apache.hadoop.eclipse\hadoop-conf-2965312243669964400;E:\workspace\hadoop_study\bin;D:\app\lib\activation-1.1.jar;D:\app\lib\annotations-api.jar;D:\app\lib\aopalliance-1.0.jar;D:\app\lib\asm-3.1.jar;D:\app\lib\asm-3.2.jar;D:\app\lib\avro-1.7.4.jar;D:\app\lib\catalina.jar;D:\app\lib\catalina-ant.jar;D:\app\lib\catalina-ha.jar;D:\app\lib\catalina-tribes.jar;D:\app\lib\commons-beanutils-1.7.0.jar;D:\app\lib\commons-beanutils-core-1.8.0.jar;D:\app\lib\commons-cli-1.2.jar;D:\app\lib\commons-codec-1.4.jar;D:\app\lib\commons-codec-1.7.jar;D:\app\lib\commons-collections-3.2.1.jar;D:\app\lib\commons-compress-1.4.1.jar;D:\app\lib\commons-configuration-1.6.jar;D:\app\lib\commons-daemon-1.0.13.jar;D:\app\lib\commons-digester-1.8.jar;D:\app\lib\commons-el-1.0.jar;D:\app\lib\commons-httpclient-3.1.jar;D:\app\lib\commons-io-2.1.jar;D:\app\lib\commons-io-2.4.jar;D:\app\lib\commons-lang-2.5.jar;D:\app\lib\commons-lang-2.6.jar;D:\app\lib\commons-logging-1.1.1.jar;D:\app\lib\commons-math-2.1.jar;D:\app\lib\commons-net-3.1.jar;D:\app\lib\ecj-3.7.2.jar;D:\app\lib\el-api.jar;D:\app\lib\findbugs-annotations-1.3.9-1.jar;D:\app\lib\gmbal-api-only-3.0.0-b023.jar;D:\app\lib\grizzly-framework-2.1.2.jar;D:\app\lib\grizzly-http-2.1.2.jar;D:\app\lib\grizzly-http-server-2.1.2.jar;D:\app\lib\grizzly-http-servlet-2.1.2.jar;D:\app\lib\grizzly-rcm-2.1.2.jar;D:\app\lib\guava-11.0.2.jar;D:\app\lib\guava-12.0.1.jar;D:\app\lib\guice-3.0.jar;D:\app\lib\guice-servlet-3.0.jar;D:\app\lib\hadoop-annotations-2.2.0.jar;D:\app\lib\hadoop-archives-2.2.0.jar;D:\app\lib\hadoop-auth-2.2.0.jar;D:\app\lib\hadoop-client-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0-tests.jar;D:\app\lib\hadoop-datajoin-2.2.0.jar;D:\app\lib\hadoop-distcp-2.2.0.jar;D:\app\lib\hadoop-extras-2.2.0.jar;D:\app\lib\hadoop-gridmix-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0-tests.jar;D:\app\lib\hadoop-hdfs-nfs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-app-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-common-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-core-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-plugins-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0-tests.jar;D:\app\lib\hadoop-mapreduce-client-shuffle-2.2.0.jar;D:\app\lib\hadoop-mapreduce-examples-2.2.0.jar;D:\app\lib\hadoop-nfs-2.2.0.jar;D:\app\lib\hadoop-rumen-2.2.0.jar;D:\app\lib\hadoop-streaming-2.2.0.jar;D:\app\lib\hadoop-yarn-api-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-distributedshell-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-unmanaged-am-launcher-2.2.0.jar;D:\app\lib\hadoop-yarn-client-2.2.0.jar;D:\app\lib\hadoop-yarn-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-nodemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-resourcemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0-tests.jar;D:\app\lib\hadoop-yarn-server-web-proxy-2.2.0.jar;D:\app\lib\hadoop-yarn-site-2.2.0.jar;D:\app\lib\hamcrest-core-1.1.jar;D:\app\lib\hamcrest-core-1.3.jar;D:\app\lib\hbase-client-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-examples-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop2-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-prefix-tree-0.96.2-hadoop2.jar;D:\app\lib\hbase-protocol-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-shell-0.96.2-hadoop2.jar;D:\app\lib\hbase-testing-util-0.96.2-hadoop2.jar;D:\app\lib\hbase-thrift-0.96.2-hadoop2.jar;D:\app\lib\hsqldb-2.0.0.jar;D:\app\lib\htrace-core-2.04.jar;D:\app\lib\httpclient-4.1.3.jar;D:\app\lib\httpcore-4.1.3.jar;D:\app\lib\jackson-core-asl-1.8.8.jar;D:\app\lib\jackson-jaxrs-1.8.8.jar;D:\app\lib\jackson-mapper-asl-1.8.8.jar;D:\app\lib\jackson-xc-1.8.8.jar;D:\app\lib\jamon-runtime-2.3.1.jar;D:\app\lib\jasper.jar;D:\app\lib\jasper-compiler-5.5.23.jar;D:\app\lib\jasper-el.jar;D:\app\lib\jasper-runtime-5.5.23.jar;D:\app\lib\javax.inject-1.jar;D:\app\lib\javax.servlet-3.1.jar;D:\app\lib\javax.servlet-api-3.0.1.jar;D:\app\lib\jaxb-api-2.2.2.jar;D:\app\lib\jaxb-impl-2.2.3-1.jar;D:\app\lib\jersey-client-1.9.jar;D:\app\lib\jersey-core-1.8.jar;D:\app\lib\jersey-core-1.9.jar;D:\app\lib\jersey-grizzly2-1.9.jar;D:\app\lib\jersey-guice-1.9.jar;D:\app\lib\jersey-json-1.8.jar;D:\app\lib\jersey-json-1.9.jar;D:\app\lib\jersey-server-1.8.jar;D:\app\lib\jersey-server-1.9.jar;D:\app\lib\jersey-test-framework-core-1.9.jar;D:\app\lib\jersey-test-framework-grizzly2-1.9.jar;D:\app\lib\jets3t-0.6.1.jar;D:\app\lib\jettison-1.1.jar;D:\app\lib\jettison-1.3.1.jar;D:\app\lib\jetty-6.1.26.jar;D:\app\lib\jetty-sslengine-6.1.26.jar;D:\app\lib\jetty-util-6.1.26.jar;D:\app\lib\jruby-complete-1.6.8.jar;D:\app\lib\jsch-0.1.42.jar;D:\app\lib\jsp-2.1-6.1.14.jar;D:\app\lib\jsp-api.jar;D:\app\lib\jsp-api-2.1.jar;D:\app\lib\jsp-api-2.1-6.1.14.jar;D:\app\lib\jsr305-1.3.9.jar;D:\app\lib\junit-4.8.2.jar;D:\app\lib\junit-4.10.jar;D:\app\lib\junit-4.11.jar;D:\app\lib\libthrift-0.9.0.jar;D:\app\lib\log4j-1.2.17.jar;D:\app\lib\management-api-3.0.0-b012.jar;D:\app\lib\metrics-core-2.1.2.jar;D:\app\lib\mockito-all-1.8.5.jar;D:\app\lib\netty-3.6.2.Final.jar;D:\app\lib\netty-3.6.6.Final.jar;D:\app\lib\paranamer-2.3.jar;D:\app\lib\protobuf-java-2.5.0.jar;D:\app\lib\servlet-api.jar;D:\app\lib\servlet-api-2.5.jar;D:\app\lib\servlet-api-2.5-6.1.14.jar;D:\app\lib\slf4j-api-1.6.4.jar;D:\app\lib\slf4j-api-1.7.5.jar;D:\app\lib\slf4j-log4j12-1.6.4.jar;D:\app\lib\slf4j-log4j12-1.7.5.jar;D:\app\lib\snappy-java-1.0.4.1.jar;D:\app\lib\stax-api-1.0.1.jar;D:\app\lib\tomcat-coyote.jar;D:\app\lib\tomcat-dbcp.jar;D:\app\lib\tomcat-i18n-es.jar;D:\app\lib\tomcat-i18n-fr.jar;D:\app\lib\tomcat-i18n-ja.jar;D:\app\lib\xmlenc-0.52.jar;D:\app\lib\xz-1.0.jar;D:\app\lib\zookeeper-3.4.5.jar 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=C:\Program Files\Java\jdk1.6.0_45\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin/client;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/lib/i386;D:\app\hadoop-2.2.0\bin;C:\Program Files (x86)\Java\jdk1.6.0_35\bin;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;D:\app\Oracle11g\product\11.1.0\db_1\bin;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;D:\Program Files\TortoiseSVN\bin;D:\Program Files\MySQL\MySQL Server 5.5\bin;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;D:\Program Files\SSH Communications Security\SSH Secure Shell;.;;D:\app\eclipse;;. 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=C:\Users\LiuYQ\AppData\Local\Temp\ 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA> 2014-08-26 22:31:51,263 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Windows 7 2014-08-26 22:31:51,263 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=6.1 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=LiuYQ 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=C:\Users\LiuYQ 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=E:\workspace\hadoop_study 2014-08-26 22:31:51,272 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=slave130:2181,master129:2181,slave132:2181,slave131:2181 sessionTimeout=180000 watcher=hconnection-0x76b20352, quorum=slave130:2181,master129:2181,slave132:2181,slave131:2181, baseZNode=/hbase 2014-08-26 22:31:51,389 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(120)) - Process identifier=hconnection-0x76b20352 connecting to ZooKeeper ensemble=slave130:2181,master129:2181,slave132:2181,slave131:2181 2014-08-26 22:31:51,404 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(966)) - Opening socket connection to server master129/192.168.24.129:2181. Will not attempt to authenticate using SASL (无法定位登录配置) 2014-08-26 22:31:51,419 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(849)) - Socket connection established to master129/192.168.24.129:2181, initiating session 2014-08-26 22:31:51,464 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1207)) - Session establishment complete on server master129/192.168.24.129:2181, sessionid = 0x48129e3b750006, negotiated timeout = 150000 2014-08-26 22:31:53,018 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(366)) - Looking up current regions for table testtable1 2014-08-26 22:31:53,046 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(368)) - Configuring 1 reduce partitions to match current region count 2014-08-26 22:31:53,164 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:writePartitions(287)) - Writing partition information to /tmp/partitions_8f008fe0-9170-48ac-940a-83c2813f1378 2014-08-26 22:31:53,356 WARN [main] zlib.ZlibFactory (ZlibFactory.java:<clinit>(50)) - Failed to load/initialize native-zlib library 2014-08-26 22:31:53,359 INFO [main] compress.CodecPool (CodecPool.java:getCompressor(150)) - Got brand-new compressor [.deflate] 2014-08-26 22:31:53,904 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:53,964 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,006 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,059 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,110 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,310 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(380)) - Incremental table testtable1 output configured. 2014-08-26 22:31:54,362 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - session.id is deprecated. Instead, use dfs.metrics.session-id 2014-08-26 22:31:54,365 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId= 2014-08-26 22:31:54,998 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(149)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 2014-08-26 22:31:55,083 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(439)) - Cleaning up the staging area file:/tmp/hadoop-LiuYQ/mapred/staging/hadoop5469487/.staging/job_local5469487_0001 java.lang.IllegalArgumentException: Pathname /D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar from hdfs://master129:9000/D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:184) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57) at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:264) at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:300) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:387) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286) at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:76) at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:1) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:337) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471) at com.lyq.study.example.File2HFile2HBase.run(File2HFile2HBase.java:65) at com.lyq.study.example.File2HFile2HBase.main(File2HFile2HBase.java:135)
最终重写了HFileOutputFormat2 为HFileOutputFormatBase,使用HFileOutputFormatBase,至此成功运行。
附件是
File2HFile2HBase.ava
HFileOutputFormatBase.java
data.txt
和源码HFileOutputFormat2.java