欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用Hadoop的Java API操作HDFS

程序员文章站 2024-03-23 08:04:58
...

本文介绍Java API访问HDFS,实现文件的读写,文件系统的操作等。开发环境为eclipse,开发时所依赖的jar包,可在Hadoop安装目录下找到。

Demo

package com.test.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class TestHdfs {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);

        //write
        Path path = new Path("/import/tmp/Wtest.txt");
        FSDataOutputStream fout = fs.create(path);
        byte[] bWrite = "hello hadoop distribute file system \n".getBytes();
        fout.write(bWrite); //写入字节数组
        fout.flush(); //flush提供了一种将缓冲区的数据强制刷新到文件系统的方法
        fout.close(); //关闭写出流

        fout = fs.append(path);
        fout.write("append: the append method of java API \n".getBytes());
        fout.close(); //关闭写出流

        //read
        FSDataInputStream fin = fs.open(path);
        byte[] buff = new byte[128];
        int len = 0 ;

        while( (len = fin.read(buff,0,128))  != -1 )
        {
            System.out.print(new String(buff,0,len));
        }

        //创建目录      
        if(fs.mkdirs(new Path("/import/test")))
        {
            System.out.println("mkdir /import/test success ");
        }

        //列出目录
        FileStatus[]  paths = fs.listStatus(new Path("/import"));
        for(int i = 0 ; i < paths.length ;++i)
        {
            System.out.println(paths[i].toString());
            System.out.println(paths[i].getLen());
            System.out.println(paths[i].isDirectory());
            System.out.println(paths[i].getPath().getParent());
            System.out.println(paths[i].getPath());
            System.out.println(paths[i].getPath().getName());
        }

        //删除
        if(fs.delete(new Path("/import"), true))
        {
            System.out.println("delete directory /import ");
        }

        fin.close();
        fs.close();
    }

}

Explain
使用HDFS的JAVA API操作HDFS的文件系统,首先要获取用于操作文件系统的实例,而文件系统的又是与当前的系统的环境变量息息相关。对于操作HDFS来说,环境配置主要是core-site.xml中的相关配置。

1.获取文件系统访问实例

Configuration conf = new Configuration();  //获取当前的默认环境配置
FileSystem fs = FileSystem.get(conf);   //根据当前环境配置,获取文件系统访问实例

Configuration还用提供了用于增加/修改当前环境配置的相关方法,如addResource(Path file)可以增加xml格式的配置,set(String name,String value)以键值对的形式新增/修改配置项。
获取文件系统访问实例方法有:

public static FileSystem get(Configuration conf) throws IOException;
public static FileSystem get(URI uri,Configuration conf) throws IOException;

第一个方法是使用默认的URI地址(core-site.xml中配置)获取当前环境变量来加载文件系统,第二个方法则传入指定的URI路径来获取实例。

2.向HDFS中写入数据

public FSDataOutputStream create(Path f) throws IOException;
public FSDataOutputStream append(Path f) throws IOException;
public void write(byte b[]) throws IOException;
public final void writeBytes(String s) throws IOException
public final void writeUTF(String str) throws IOException

首先要根据当前的环境,获取写出数据了对象,create方法根据路径创建数据流对象,如果path目录的文件已经存在,则会覆盖原文件的内容。append方法则在原路径的文件上追加写入。都返回了FSDataOutputStream对象,其继承至DataOutputStream,提供了标准I/O的操作。FSDataOutputStream提供了很多写出数据流的方法如重载的write,writeBytes,writeUTF等。flush提供了一种将缓冲区的数据强制刷新到文件系统的方法。此外,write()提供了一种带有回调方法的参数,回去在每次写出缓存时,提供进度。

OutputStream out = fs.create(new Path(dst), new Progressable() {
    public void progress() {
        System.out.print(".");
    }
});
IOUtils.copyBytes(in, out, 4096, true);

3.读取HDFS文件系统的数据

public FSDataInputStream open(Path f) throws IOException;
public final int read(byte b[], int off, int len) throws IOException;

public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer ;

open()方法根据传进来的path路径,获取环境变量,并设置读取的缓冲区大小(默认为4096),然后返回FSDataInputStream实例,FSDataInputStream继承至DataInputStream,并实现了Seekable等接口。DataInputStream继承至标准I/O类,Seekable接口实现对数据的重定位,PositionedReadable接口实现从指定偏移量处读取文件。
read()方法从指定off位置读取len长度的字节存入byte数组。如果到达文件尾则返回-1,否则返回读取的实际长度。

4.文件/目录操作

public boolean mkdirs(Path f) throws IOException;
提供递归的创建path目录功能,mkdirs还有带权限的重载版本

public abstract boolean delete(Path paramPath, boolean paramBoolean) throws IOException;
如果paramBoolean为false,则不能递归的删除子目录,如果此时目录非空,将抛出异常Directory is not empty

public abstract FileStatus[] listStatus(Path paramPath) throws FileNotFoundException, IOException;
private void listStatus(ArrayList<FileStatus> results, Path f, PathFilter filter)  throws FileNotFoundException, IOException;

listStatus方法可以列出指定目录下的文件或者文件夹(不能递归列出),具有PathFilter过滤的重载版本
FileStatus对象描述了文件的各种属性,诸如文件是否是文件夹,文件的权限,所有者等,isDirectory(),getLen(),getPath()...

copyFromLocalFile(src, dst);  //从本地拷贝文件到HDFS
copyToLocalFile(src, dst);    //从HDFS直接拷贝文件到本地

5.打包&运行
Eclipse编译所有的jar包从hadoop安装包中都能找到,不一一列出。
导出jar包:Export->java JAR file
选择相关代码目录,依赖的jar包可以不用导出,hadoop默认环境知道如何加载这些jar包
需要在 JAR Manifest Specification中为待运行的application选择运行的main class
使用Hadoop的Java API操作HDFS

hello hadoop distribute file system 
append: the append method of java API 
mkdir /import/test success 
FileStatus{path=hdfs://localhost:9000/import/test; isDirectory=true; modification_time=1501305873649; access_time=0; owner=root; group=supergroup; permission=rwxr-xr-x; isSymlink=false}
0
true
hdfs://localhost:9000/import
hdfs://localhost:9000/import/test
test
相关标签: hadoop hdfs java