【Hadoop】之 实验一(过滤、合并文件)
程序员文章站
2024-03-20 22:40:22
...
实验步骤
① 从该目录中过滤出所有后缀名不为”.abc”的文件
② 对过滤之后的文件进行读取
③ 将这些文件的内容合并到文件”hdfs://localhost:9000/user/hadoop/merge.txt”
预备
重要:配置1 ,配置2
因为凡凡使用virtualbox虚拟机,本地想访问虚拟机中Ubuntu。
(1)配置
(2)在Ubuntu中启动shell,输入:ifconfig
(3)最后访问
API 简介
org.apache.hadoop.fs.PathFilter accept(Path path)
对path指代的文件进行过滤
FileSystem.listStatus(Path path, PathFilter filer)
方法获得目录path中所有文件经过过滤器后的状态对象数组。
FileSystem.open(Path path)
获得与路径path相关的FSDataInputStream对象,并利用该对象读取文件的内容。
FileSystem.create(Path path)
方法获得与路径path相关的FSDataOutputStream对象,并利用该对象将字节数组输出到字节。
FileSystem.get(URI uri, Configuration conf)
根据资源表示符uri和文件系统配置信息conf获得对应的文件系统。
操作
首先:提醒
(1) fsSource.open( sta.getPath() ), 如果不是文件,而是文件夹,则会报错
(2)可以面向对象,面向接口开发
实验代码
更改①,只过滤 .abc 后缀的
class MyPathFilter implements PathFilter {
String reg = null;
MyPathFilter(String reg) {
this.reg = reg;
}
public boolean accept(Path path) {
//①
if(path.toString().matches(reg)) {
return true;
}
return false;
}
}
public class Merge {
Path inputPath = null;
Path outputPath = null;
public Merge(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(".*\\.abc"));
FSDataOutputStream fsdos = fsDst.create(outputPath);
for (FileStatus sta : sourceStatus) {
System.out.println("path : " + sta.getPath() + " file size : " + sta.getLen() +
" auth: " + sta.getPermission() + " content: ");
File file = new File(sta.getPath() + "");
if (!file.isFile()) {
continue;
}
System.out.println("next ");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read = -1;
PrintStream ps = new PrintStream(System.out);
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0 ,read);
}
fsdis.close();
ps.close();
}
fsdos.close();
}
public static void main(String[] args) throws IOException{
Merge merge = new Merge(
"hdfs://localhost:9000/user/hadoop",
"hdfs://localhost:9000/user/hadoop/merge.txt"
);
merge.doMerge();
}
}
改进代码
这里写代码片
上一篇: count(*)和count(1)的区别
下一篇: 创建自己的CocoaPods库并提交指南