欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Hadoop】之 实验一(过滤、合并文件)

程序员文章站 2024-03-20 22:40:22
...

实验步骤


① 从该目录中过滤出所有后缀名不为”.abc”的文件

② 对过滤之后的文件进行读取

③ 将这些文件的内容合并到文件”hdfs://localhost:9000/user/hadoop/merge.txt”




预备


重要:配置1配置2

因为凡凡使用virtualbox虚拟机,本地想访问虚拟机中Ubuntu。

(1)配置
【Hadoop】之 实验一(过滤、合并文件)【Hadoop】之 实验一(过滤、合并文件)
(2)在Ubuntu中启动shell,输入:ifconfig
【Hadoop】之 实验一(过滤、合并文件)
(3)最后访问
【Hadoop】之 实验一(过滤、合并文件)




API 简介


org.apache.hadoop.fs.PathFilter accept(Path path) 对path指代的文件进行过滤

FileSystem.listStatus(Path path, PathFilter filer) 方法获得目录path中所有文件经过过滤器后的状态对象数组。

FileSystem.open(Path path) 获得与路径path相关的FSDataInputStream对象,并利用该对象读取文件的内容。

FileSystem.create(Path path) 方法获得与路径path相关的FSDataOutputStream对象,并利用该对象将字节数组输出到字节。

FileSystem.get(URI uri, Configuration conf) 根据资源表示符uri和文件系统配置信息conf获得对应的文件系统。




操作

首先:提醒

(1) fsSource.open( sta.getPath() ), 如果不是文件,而是文件夹,则会报错

(2)可以面向对象,面向接口开发

实验代码

更改①,只过滤 .abc 后缀的

class MyPathFilter implements PathFilter {
    String reg = null;
    MyPathFilter(String reg) {
        this.reg = reg;
    }
    public boolean accept(Path path) {
        //①
        if(path.toString().matches(reg)) {
            return true;
        }
        return false;
    }

}

public class Merge {

    Path inputPath = null;
    Path outputPath = null;
    public Merge(String input, String output) {
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);

        FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(".*\\.abc"));
        FSDataOutputStream fsdos = fsDst.create(outputPath);



        for (FileStatus sta : sourceStatus) {

            System.out.println("path : " + sta.getPath() + "  file size : " + sta.getLen() + 
                        "  auth: " + sta.getPermission() + "  content: ");

            File file = new File(sta.getPath() + "");

            if (!file.isFile()) {
                continue;
            }

            System.out.println("next ");

            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            byte[] data = new byte[1024];
            int read = -1;
            PrintStream ps = new PrintStream(System.out);
            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0 ,read);
            }
            fsdis.close();
            ps.close();
        }
        fsdos.close();
    }
    public static void main(String[] args) throws IOException{
        Merge merge = new Merge(
                "hdfs://localhost:9000/user/hadoop", 
                "hdfs://localhost:9000/user/hadoop/merge.txt"
                );
        merge.doMerge();
    }

}


改进代码

这里写代码片