欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《深入理解大数据:大数据处理与编程实践》一一3.5 HDFS基本编程接口与示例...

程序员文章站 2024-03-22 13:34:46
...

本节书摘来自华章计算机《深入理解大数据:大数据处理与编程实践》一书中的第3章,第3.5节,作者 主 编:黄宜华(南京大学)副主编:苗凯翔(英特尔公司),更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.5 HDFS基本编程接口与示例

除了上一节提到的命令之外,Hadoop提供了可用于读写、操作文件的API,这样可以让程序员通过编程实现自己的HDFS文件操作。
Hadoop提供的大部分文件操作API都位于org.apache.hadoop.fs这个包中。基本的文件操作包括打开、读取、写入、关闭等。为了保证能跨文件系统交换数据,Hadoop的API也可以对部分非HDFS的文件系统提供支持;也就是说,用这些API来操作本地文件系统的文件也是可行的。
3.5.1 HDFS编程基础知识
在Hadoop中,基本上所有的文件API都来自FileSystem类。FileSystem是一个用来与文件系统交互的抽象类,可以通过实现FileSystem的子类来处理具体的文件系统,比如HDFS或者其他文件系统。通过factory方法FileSystem.get(Configuration conf),可以获得所需的文件系统实例(factory方法是软件开发的一种设计模式,指:基类定义接口,但是由子类实例化之;在这里FileSystem定义get接口,但是由FileSytem的子类(比如FilterFileSystem)实现)。Configuration类比较特殊,这个类通过键值对的方式保存了一些配置参数。这些配置默认情况下来自对应文件系统的资源配置。我们可以通过如下方式获得具体的FileSystem实例:
Conf?iguration conf = new Conf?iguration();
FileSystem hdfs = FileSystem.get(conf);
如果要获得本地文件系统对应的FileSystem实例,则可以通过factory方法FileSystem.getLocal(Configuration conf)实现:
FileSystem local = FileSystem.getLocal(conf);
Hadoop中,使用Path类的对象来编码目录或者文件的路径,使用后面会提到的FileStatus类来存放目录和文件的信息。在Java的文件API中,文件名都是String类型的字符串,在这里则是Path类型的对象。
3.5.2 HDFS基本文件操作API
接下来看一下具体的文件操作。我们按照“创建、打开、获取文件信息、获取目录信息、读取、写入、关闭、删除”的顺序讲解Hadoop提供的文件操作的API。
以下接口的实际内容可以在Hadoop API和Hadoop源代码中进一步了解。
1.?创建文件
FileSystem.create方法有很多种定义形式,参数最多的一个是:

public abstract FSDataOutputStream create(Path?f,
              FsPermission?permission,
              boolean overwrite,
              int bufferSize,
              short?replication,
              long?blockSize,
              Progressable?progress)
              Throws IOException

那些参数较少的create只不过是将其中一部分参数用默认值代替,最终还是要调用这个函数。其中各项的含义如下:
f:文件名
overwrite:如果已存在同名文件,overwrite=true覆盖之,否则抛出错误;默认为true。
buffersize:文件缓存大小。默认值:Configuration中io.file.buffer.size的值,如果Configuration中未显式设置该值,则是4096。
replication:创建的副本个数,默认值为1。
blockSize:文件的block大小,默认值:Configuration中fs.local.block.size的值,如果Configuration中未显式设置该值,则是32M。
permission和progress的值与具体文件系统实现有关。
但是大部分情况下,只需要用到最简单的几个版本:
publicFSDataOutputStream create(Path?f);
publicFSDataOutputStream create(Path?f,boolean?overwrite);
publicFSDataOutputStream create(Path?f,boolean?overwrite,int?bufferSize);
2.?打开文件
FileSystem.open方法有2个,参数最多的一个定义如下:
public abstract FSDataInputStream open(Path f, intbufferSize) throws IOException
其中各项的含义如下:
f:文件名
buffersize:文件缓存大小。默认值:Configuration中io.file.buffer.size的值,如果Configur
ation中未显式设置该值,则是4096。
3.?获取文件信息
FileSystem.getFileStatus方法格式如下:
public abstract FileStatus getFileStatus(Path f) throws IOException;
这一函数会返回一个FileStatus对象。通过阅读源代码可知,FileStatus保存了文件的很多信息,包括:
path:文件路径
length:文件长度
isDir:是否为目录
block_replication:数据块副本因子
blockSize:文件长度(数据块数)
modification_time:最近一次修改时间
access_time:最近一次访问时间
owner:文件所属用户
group:文件所属组
如果想了解文件的这些信息,可以在获得文件的FileStatus实例之后,调用相应的getXXX方法(比如,FileStatus.getModificationTime()获得最近修改时间)。
4.?获取目录信息
获取目录信息,不仅是目录本身,还有目录之下的文件和子目录信息,如下所述。FileStatus.listStatus方法格式如下:
public FileStatus[] listStatus(Path f) throws IOException;
如果f是目录,那么将目录之下的每个目录或文件信息保存在FileStatus数组中返回。如果f是文件,和getFileStatus功能一致。
另外,listStatus还有参数为Path[]的版本的接口定义以及参数带路径过滤器PathFilter的接口定义,参数为Path[]的listStatus就是对这个数组中的每个path都调用上面的参数为Path的listStatus。参数中的PathFilter则是一个接口,实现接口的accept方法可以自定义文件过滤规则。
另外,HDFS还可以通过正则表达式匹配文件名来提取需要的文件,这个方法是:
public FileStatus[] globStatus(Path pathPattern) throws IOException;
参数pathPattern中,可以像正则表达式一样,使用通配符来表示匹配规则:
?:表示任意的单个字符。
:表示任意长度的任意字符,可以用来表示前缀后缀,比如.java表示所有java文件。
[abc]:表示匹配a,b,c中的单个字符。
[a-b]:表示匹配a-b范围之间的单个字符。
1:表示匹配除a之外的单个字符。
c:表示取消特殊字符的转义,比如*的结果是*而不是随意匹配。
{ab,cd}:表示匹配ab或者cd中的一个串。
{ab,c{de,fh}}:表示匹配ab或者cde或者cfh中的一个串
5.?读取
3.3.2节提到,调用open打开文件之后,使用了一个FSDataInputStream对象来负责数据的读取。通过FSDataInputStream进行文件读取时,提供的API就是FSDataInputStream.read方法:
public int read(long?position, byte[] buffer, int?offset, int?length) throws IOException
函数的意义是:从文件的指定位置position开始,读取最多length字节的数据,保存到buffer中从offset个元素开始的空间中;返回值为实际读取的字节数。此函数不改变文件当前offset值。不过,使用更多的还有一种简化版本:
public f?inal int read(byte[]?b)throws IOException
从文件当前位置读取最多长度为b.len的数据保存到b中,返回值为实际读取的字节数。
6.?写入
从接口定义可以看出,调用create创建文件以后,使用了一个FSDataOutputStream对象来负责数据的写入。通过FSDataOutputStream进行文件写入时,最常用的API就是write方法:
public void write(byte[]?b,int?off,int?len) throws IOException
函数的意义是:将b中从off开始的最多len个字节的数据写入文件当前位置。返回值为实际写入的字节数。
7.?关闭
关闭为打开的逆过程,FileSystem.close定义如下:

public void close() throws IOException

不需要其他操作而关闭文件。释放所有持有的锁。
8.?删除
删除过程FileSystem.delete定义如下:

public abstract boolean delete(Path f,boolean?recursive) throws IOException

其中各项含义如下:
f:待删除文件名。
recursive:如果recursive为true,并且f是目录,那么会递归删除f下所有文件;如果f是文件,recursive为true还是false无影响。
另外,类似Java中File的接口DeleteOnExit,如果某些文件需要删除,但是当前不能被删;或者说当时删除代价太大,想留到退出时再删除的话,FileSystem中也提供了一个deleteOnExit接口:
Public Boolean deleteOnExit(Path?f) throws IOException
标记文件f,当文件系统关闭时才真正删除此文件。但是这个文件f在文件系统关闭前必须存在。
3.5.3 HDFS基本编程实例
本节介绍使用HDFS的API编程的简单示例。
下面的程序可以实现如下功能:在输入文件目录下的所有文件中,检索某一特定字符串所出现的行,将这些行的内容输出到本地文件系统的输出文件夹中。这一功能在分析MapReduce作业的Reduce输出时很有用。
这个程序假定只有第一层目录下的文件才有效,而且,假定文件都是文本文件。当然,如果输入文件夹是Reduce结果的输出,那么一般情况下,上述条件都能满足。为了防止单个的输出文件过大,这里还加了一个文件最大行数限制,当文件行数达到最大值时,便关闭此文件,创建另外的文件继续保存。保存的结果文件名为1,2,3,4,…,以此类推。
如上所述,这个程序可以用来分析MapReduce的结果,所以称为ResultFilter。
程序:Result Filter
输入参数:此程序接收4个命令行输入参数,参数含义如下:

<dfs path>:HDFS上的路径
<local path>:本地路径
<match str>:待查找的字符串
<single file lines>:结果每个文件的行数
程序:ResultFilter
import java.util.Scanner;
import java.io.IOException;
import java.io.File;

import org.apache.hadoop.conf.Conf?iguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class resultFilter
{
    public static void main(String[] args) throws IOException {
        Conf?iguration conf = new Conf?iguration();
        // 以下两句中,hdfs和local分别对应HDFS实例和本地文件系统实例
        FileSystem hdfs = FileSystem.get(conf);
        FileSystem local = FileSystem.getLocal(conf);

        Path inputDir, localFile;

        FileStatus[] inputFiles;
        FSDataOutputStream out = null;
        FSDataInputStream in = null;
        Scanner scan;
        String str;
        byte[] buf;
        int singleFileLines;
        int numLines, numFiles, i;

        if(args.length!=4)
        {
            // 输入参数数量不够,提示参数格式后终止程序执行
            System.err.println("usage resultFilter <dfs path><local path>" + 
            " <match str><single f?ile lines>");
            return;
        }
        inputDir = new Path(args[0]);
        singleFileLines = Integer.parseInt(args[3]);

        try {
            inputFiles = hdfs.listStatus(inputDir);    // 获得目录信息
            numLines = 0;
            numFiles = 1;                // 输出文件从1开始编号
            localFile = new Path(args[1]);
            if(local.exists(localFile))        // 若目标路径存在,则删除之
                local.delete(localFile, true);
            for (i = 0; i<inputFiles.length; i++) {
                if(inputFiles[i].isDir() == true)    // 忽略子目录
                        continue;
            System.out.println(inputFiles[i].getPath().getName());
            in = hdfs.open(inputFiles[i].getPath());
            scan = new Scanner(in);
            while (scan.hasNext()) {
                str = scan.nextLine();
                if(str.indexOf(args[2])==-1)
                    continue;            // 如果该行没有match字符串,则忽略之
                numLines++;
                if(numLines == 1)            // 如果是1,说明需要新建文件了
                {
                    localFile = new Path(args[1] + File.separator + numFiles);
                    out = local.create(localFile);    // 创建文件
                    numFiles++;
                }
                buf = (str+"\n").getBytes();
                out.write(buf, 0, buf.length);        // 将字符串写入输出流
                if(numLines == singleFileLines)    // 如果已满足相应行数,关闭文件
                {
                    out.close();
                    numLines = 0;            // 行数变为0,重新统计
                }
            }// end of while
                scan.close();
                in.close();
            }// end of for
            if(out != null)
                out.close();
            } // end of try
            catch (IOException e) {
                e.printStackTrace();
            }
        }// end of main
    }// end of resultFilter

程序的编译命令:
javac *.java
运行命令:

hadoop jar resultFilter.jar resultFilter <dfs path>\
          <local path><match str><single f?ile lines>

参数和含义如下:
:HDFS上的路径
:本地路径
:待查找的字符串
:结果的每个文件的行数
上述程序的逻辑很简单,获取该目录下所有文件的信息,对每一个文件,打开文件、循环读取数据、写入目标位置,然后关闭文件,最后关闭输出文件。这里粗体打印的几个函数上面都有介绍,不再赘述。
我们在自己机器上预装的hadoop-1.0.4上简单试验了这个程序,在hadoop源码中拷贝了几个文件,然后上传到HDFS中,文件如下(见图3-17):
《深入理解大数据:大数据处理与编程实践》一一3.5 HDFS基本编程接口与示例...

然后,编译运行一下该示例程序,显示一下目标文件内容,结果如图3-18所示,其中,将出现“java”字符串的每一行都输出到文件中。

《深入理解大数据:大数据处理与编程实践》一一3.5 HDFS基本编程接口与示例...


  1. a