HDFS的JavaAPI操作
程序员文章站
2024-03-23 00:01:52
...
1. HDFS的API操作
HDFS在生产应用中主要是客户端的开发,其核心步骤是从HDFS提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件。
1.1 配置本地windows环境
链接: https://pan.baidu.com/s/1O5iG3LhS_oTdatYScnV5OA
提取码:ssbb
搭建步骤:
第一步:将已经编译好的Windows版本Hadoop解压到到一个没有中文没有空格的路径下面
第二步:在windows上面配置hadoop的环境变量: HADOOP_HOME,并将%HADOOP_HOME%\bin添加到path中
第三步:把hadoop2.7.5文件夹中bin目录下的hadoop.dll文件放到系统盘: C:\Windows\System32 目录
第四步:关闭windows重启
1.2 前奏
/**
* 在java中操作HDFS,主要涉及以下Class:
* Configuration:该类的对象封转了客户端或者服务器的配置;
* FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作,通过FileSystem的静态方法get获得该对象。
*/
public class FileSystemDemo1 {
//方式1
// 获取FileSystem对象 只要获取了这个对象 就可以对整个HDFS系统进行增删改查
@Test
public void getFileSystem01() throws IOException {
//1.创建Configuration对象
Configuration cf = new Configuration();
//2.设置Configuration对象,指定获取哪一个HDFS文件系统(本地还是HDFS)
cf.set("fs.defaultFS","hdfs://afu01:8020");
//3.通过FileSystem获取FileSystem对象
FileSystem fs = FileSystem.get(cf);
//4.输出查看FileSystem一下下
System.out.println(fs.toString());
}
//方式2 推荐
//获取FileSystem对象 只要获取了这个对象 就可以对整个HDFS系统进行增删改查
@Test
public void getFileSystem02() throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://afu01:8020"), new Configuration());
System.out.println(fs);
}
}
1.3 正式操作
1.流的读写(输入输出)对象要明确 针对HDFS windows linux
2.如果读取就是 inputstream 写入就是 outputstream 读取本地文件 写入 HDFS
从哪里读取(FileInputStream)… 写入到哪里(FileOutputStream)…
public class FileSystemDemo2 {
/**
* 1. 遍历hdfs上的所有文件
*/
@Test
public void getHDFS1() throws Exception {
//1.获取FileSystem对象
FileSystem fs = FileSystem.get(new URI("hdfs://afu01:8020"), new Configuration());
//2. 获取从 / 目录开始的所有文件信息
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/"), true);
//3. 遍历集合,打印文件路径
while (iterator.hasNext()) {
//获取集合元素,也就是文件详情
LocatedFileStatus fileStatus = iterator.next();
System.out.println(fileStatus.getPath().toString()); //递归打印文件所有的路径
//获取每一个Block块信息
BlockLocation[] bl = fileStatus.getBlockLocations(); //是一个数组 因为每一个文件可能有多个Block
for (BlockLocation blockLocation : bl) {
System.out.println("一个block");
String[] hosts = blockLocation.getHosts(); //因为有3个主机 所以每一个block有 3 个副本 数组接收
for (String host : hosts) {
System.out.println(host);
}
}
}
//4. 释放资源
fs.close();
}
/**
* 2. 在HDFS上创建文件夹 mkdirs
*/
@Test
public void getHDFS2() throws Exception {
//1.获取FileSystem对象
FileSystem fs = FileSystem.get(new URI("hdfs://afu01:8020"), new Configuration()); //Configuration 客户端或者服务器的配置
//2.创建
fs.mkdirs(new Path("/ddd/fff"));
fs.close();
}
/**
* 3.从HDFS上下载文件夹
*/
@Test
public void getHDFS3() throws Exception {
//1.获取FileSystem 对象 传入HDFS的URI
FileSystem fs = FileSystem.get(new URI("hdfs://afu01:8020"), new Configuration());
//2.获取HDFS输入流 (从HDFS读取文件 写入 到本地windows)
FSDataInputStream fsis = fs.open(new Path("/a.txt")); //HDFS上的文件路径名字
//3.获取windows的输出流(指定文件的写入路径 从哪里读取.... 写入到 哪里........)
FileOutputStream fos = new FileOutputStream("E:\\b.txt"); //下载到本地路径名字 随意
//4.实现文件的复制 用到了一个操作文件的工具类 在apache.commons.io.IOUtils 这个包下面的
IOUtils.copy(fsis, fos);
//5.关闭流
IOUtils.closeQuietly(fos);
IOUtils.closeQuietly(fsis);
fs.close();
}
/**
4. HDFS的小文件的合并 将本地的小文件 合并成 大文件 上传到HDFS
明确读取.... 写入到 ......:
从本地读取(FileInputStream) ... 写入(FileOutputStream)到HDFS...
*/
@Test
public void getHDFS4() throws Exception {
//1.创建FileSystem对象
FileSystem fs = FileSystem.get(new URI("hdfs://afu01:8020"), new Configuration());
//2.获取本地文件系统
LocalFileSystem lfs = FileSystem.getLocal(new Configuration());
//3.获取HDFS大文件的输出流 (我要本地读取....写入HDFS.....)
FSDataOutputStream fsos = fs.create(new Path("/big.txt")); //大文件取得一个名字
//4.获取本地每一个小文件的输入流
FileStatus[] fst = lfs.listStatus(new Path("E:\\input"));
for (FileStatus fileStatus : fst) {
//获取每一个小文件的输入流
FSDataInputStream fsis = lfs.open(fileStatus.getPath());
//将每一个小文件复制到大文件中 工具类
IOUtils.copy(fsis, fsos);
IOUtils.closeQuietly(fsis);
}
//5.释放资源
IOUtils.closeQuietly(fsos); //快速的释放资源
lfs.close();
fs.close();
}
}