使用Hadoop的Java API操作HDFS
本文介绍Java API访问HDFS,实现文件的读写,文件系统的操作等。开发环境为eclipse,开发时所依赖的jar包,可在Hadoop安装目录下找到。
Demo
package com.test.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class TestHdfs {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//write
Path path = new Path("/import/tmp/Wtest.txt");
FSDataOutputStream fout = fs.create(path);
byte[] bWrite = "hello hadoop distribute file system \n".getBytes();
fout.write(bWrite); //写入字节数组
fout.flush(); //flush提供了一种将缓冲区的数据强制刷新到文件系统的方法
fout.close(); //关闭写出流
fout = fs.append(path);
fout.write("append: the append method of java API \n".getBytes());
fout.close(); //关闭写出流
//read
FSDataInputStream fin = fs.open(path);
byte[] buff = new byte[128];
int len = 0 ;
while( (len = fin.read(buff,0,128)) != -1 )
{
System.out.print(new String(buff,0,len));
}
//创建目录
if(fs.mkdirs(new Path("/import/test")))
{
System.out.println("mkdir /import/test success ");
}
//列出目录
FileStatus[] paths = fs.listStatus(new Path("/import"));
for(int i = 0 ; i < paths.length ;++i)
{
System.out.println(paths[i].toString());
System.out.println(paths[i].getLen());
System.out.println(paths[i].isDirectory());
System.out.println(paths[i].getPath().getParent());
System.out.println(paths[i].getPath());
System.out.println(paths[i].getPath().getName());
}
//删除
if(fs.delete(new Path("/import"), true))
{
System.out.println("delete directory /import ");
}
fin.close();
fs.close();
}
}
Explain
使用HDFS的JAVA API操作HDFS的文件系统,首先要获取用于操作文件系统的实例,而文件系统的又是与当前的系统的环境变量息息相关。对于操作HDFS来说,环境配置主要是core-site.xml中的相关配置。
1.获取文件系统访问实例
Configuration conf = new Configuration(); //获取当前的默认环境配置
FileSystem fs = FileSystem.get(conf); //根据当前环境配置,获取文件系统访问实例
Configuration还用提供了用于增加/修改当前环境配置的相关方法,如addResource(Path file)可以增加xml格式的配置,set(String name,String value)以键值对的形式新增/修改配置项。
获取文件系统访问实例方法有:
public static FileSystem get(Configuration conf) throws IOException;
public static FileSystem get(URI uri,Configuration conf) throws IOException;
第一个方法是使用默认的URI地址(core-site.xml中配置)获取当前环境变量来加载文件系统,第二个方法则传入指定的URI路径来获取实例。
2.向HDFS中写入数据
public FSDataOutputStream create(Path f) throws IOException;
public FSDataOutputStream append(Path f) throws IOException;
public void write(byte b[]) throws IOException;
public final void writeBytes(String s) throws IOException
public final void writeUTF(String str) throws IOException
首先要根据当前的环境,获取写出数据了对象,create方法根据路径创建数据流对象,如果path目录的文件已经存在,则会覆盖原文件的内容。append方法则在原路径的文件上追加写入。都返回了FSDataOutputStream对象,其继承至DataOutputStream,提供了标准I/O的操作。FSDataOutputStream提供了很多写出数据流的方法如重载的write,writeBytes,writeUTF等。flush提供了一种将缓冲区的数据强制刷新到文件系统的方法。此外,write()提供了一种带有回调方法的参数,回去在每次写出缓存时,提供进度。
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
3.读取HDFS文件系统的数据
public FSDataInputStream open(Path f) throws IOException;
public final int read(byte b[], int off, int len) throws IOException;
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer ;
open()方法根据传进来的path路径,获取环境变量,并设置读取的缓冲区大小(默认为4096),然后返回FSDataInputStream实例,FSDataInputStream继承至DataInputStream,并实现了Seekable等接口。DataInputStream继承至标准I/O类,Seekable接口实现对数据的重定位,PositionedReadable接口实现从指定偏移量处读取文件。
read()方法从指定off位置读取len长度的字节存入byte数组。如果到达文件尾则返回-1,否则返回读取的实际长度。
4.文件/目录操作
public boolean mkdirs(Path f) throws IOException;
提供递归的创建path目录功能,mkdirs还有带权限的重载版本
public abstract boolean delete(Path paramPath, boolean paramBoolean) throws IOException;
如果paramBoolean为false,则不能递归的删除子目录,如果此时目录非空,将抛出异常Directory is not empty
public abstract FileStatus[] listStatus(Path paramPath) throws FileNotFoundException, IOException;
private void listStatus(ArrayList<FileStatus> results, Path f, PathFilter filter) throws FileNotFoundException, IOException;
listStatus方法可以列出指定目录下的文件或者文件夹(不能递归列出),具有PathFilter过滤的重载版本
FileStatus对象描述了文件的各种属性,诸如文件是否是文件夹,文件的权限,所有者等,isDirectory(),getLen(),getPath()...
copyFromLocalFile(src, dst); //从本地拷贝文件到HDFS
copyToLocalFile(src, dst); //从HDFS直接拷贝文件到本地
5.打包&运行
Eclipse编译所有的jar包从hadoop安装包中都能找到,不一一列出。
导出jar包:Export->java JAR file
选择相关代码目录,依赖的jar包可以不用导出,hadoop默认环境知道如何加载这些jar包
需要在 JAR Manifest Specification中为待运行的application选择运行的main class
hello hadoop distribute file system
append: the append method of java API
mkdir /import/test success
FileStatus{path=hdfs://localhost:9000/import/test; isDirectory=true; modification_time=1501305873649; access_time=0; owner=root; group=supergroup; permission=rwxr-xr-x; isSymlink=false}
0
true
hdfs://localhost:9000/import
hdfs://localhost:9000/import/test
test