跟我学hadoop学习3
程序员文章站
2022-07-14 23:02:30
...
// cc FileDecompressor A program to decompress a compressed file using a codec inferred from the file's extension import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; // vv FileDecompressor public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } } // ^^ FileDecompressor import java.io.*; import org.apache.hadoop.io.*; public class IntPair implements WritableComparable<IntPair> { private int first; private int second; public IntPair() { } public IntPair(int first, int second) { set(first, second); } public void set(int first, int second) { this.first = first; this.second = second; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public int hashCode() { return first * 163 + second; } @Override public boolean equals(Object o) { if (o instanceof IntPair) { IntPair ip = (IntPair) o; return first == ip.first && second == ip.second; } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(IntPair ip) { int cmp = compare(first, ip.first); if (cmp != 0) { return cmp; } return compare(second, ip.second); } /** * Convenience method for comparing two ints. */ public static int compare(int a, int b) { return (a < b ? -1 : (a == b ? 0 : 1)); } } // cc MapFileFixer Re-creates the index for a MapFile import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; // vv MapFileFixer public class MapFileFixer { public static void main(String[] args) throws Exception { String mapUri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(mapUri), conf); Path map = new Path(mapUri); Path mapData = new Path(map, MapFile.DATA_FILE_NAME); // Get key and value types from data sequence file SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf); Class keyClass = reader.getKeyClass(); Class valueClass = reader.getValueClass(); reader.close(); // Create the map file index file long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf); System.out.printf("Created MapFile %s with %d entries\n", map, entries); } } // ^^ MapFileFixer // cc MapFileWriteDemo Writing a MapFile import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.Text; // vv MapFileWriteDemo public class MapFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); IntWritable key = new IntWritable(); Text value = new Text(); MapFile.Writer writer = null; try { writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass()); for (int i = 0; i < 1024; i++) { key.set(i + 1); value.set(DATA[i % DATA.length]); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } // ^^ MapFileWriteDemo // cc MaxTemperatureWithCompression Application to run the maximum temperature job producing compressed output import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //vv MaxTemperatureWithCompression public class MaxTemperatureWithCompression { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /*[*/FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);/*]*/ job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } //^^ MaxTemperatureWithCompression // == MaxTemperatureWithMapOutputCompression import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperatureWithMapOutputCompression { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithMapOutputCompression " + "<input path> <output path>"); System.exit(-1); } // vv MaxTemperatureWithMapOutputCompression Configuration conf = new Configuration(); conf.setBoolean("mapred.compress.map.output", true); conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class); Job job = new Job(conf); // ^^ MaxTemperatureWithMapOutputCompression job.setJarByClass(MaxTemperature.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } // cc PooledStreamCompressor A program to compress data read from standard input and write it to standard output using a pooled compressor import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.util.ReflectionUtils; // vv PooledStreamCompressor public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); /*[*/Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec);/*]*/ CompressionOutputStream out = codec.createOutputStream(System.out, /*[*/compressor/*]*/); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); /*[*/} finally { CodecPool.returnCompressor(compressor); }/*]*/ } } // ^^ PooledStreamCompressor // cc SequenceFileReadDemo Reading a SequenceFile import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; // vv SequenceFileReadDemo public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); // beginning of next record } } finally { IOUtils.closeStream(reader); } } } // ^^ SequenceFileReadDemo // cc SequenceFileWriteDemo Writing a SequenceFile import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; // vv SequenceFileWriteDemo public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } // ^^ SequenceFileWriteDemo // cc StreamCompressor A program to compress data read from standard input and write it to standard output import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; // vv StreamCompressor public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } } // ^^ StreamCompressor // == TextArrayWritable import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; // vv TextArrayWritable public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } } // ^^ TextArrayWritable // cc TextIterator Iterating over the characters in a Text object import java.nio.ByteBuffer; import org.apache.hadoop.io.Text; // vv TextIterator public class TextIterator { public static void main(String[] args) { Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength()); int cp; while (buf.hasRemaining() && (cp = Text.bytesToCodePoint(buf)) != -1) { System.out.println(Integer.toHexString(cp)); } } } // ^^ TextIterator // cc TextPair A Writable implementation that stores a pair of Text objects // cc TextPairComparator A RawComparator for comparing TextPair byte representations // cc TextPairFirstComparator A custom RawComparator for comparing the first field of TextPair byte representations // vv TextPair import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } // ^^ TextPair // vv TextPairComparator public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { WritableComparator.define(TextPair.class, new Comparator()); } // ^^ TextPairComparator // vv TextPairFirstComparator public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } } // ^^ TextPairFirstComparator // vv TextPair } // ^^ TextPair
上一篇: ORACLE-SQL性能优化
下一篇: ExecutorService的方法分析
推荐阅读
-
学习9.总结# 1.函数初识 # 2.函数的定义 # 3.函数的调用 # 4.函数的返回值 # 5.函数的参数
-
Linux内核学习笔记(4)-- wait、waitpid、wait3 和 wait4
-
jQuery学习笔记之jQuery+CSS3的浏览器兼容性
-
【莫烦强化学习】视频笔记(二)3.Q_Learning算法实现走迷宫
-
C学习笔记(3)---作用域,数组, (少量指针入门)
-
非常不错的SQL语句学习手册实例版第1/3页
-
pygame学习笔记(3):运动速率、时间、事件、文字
-
利用CSS3参考手册和CSS3代码生成工具加速来学习网页制
-
学习9.内容# 1.函数初识 # 2.函数的定义 # 3.函数的调用 # 4.函数的返回值 # 5.函数的参数
-
CSS3媒体查询Media Queries基础学习教程