HDFS JavaAPI 操作
一、windows 下配置hadoop开发环境
1.下载hadoop及windows版的winutils
下载hadoop压缩包(笔者使用版本为2.7.3)
网址:http://hadoop.apache.org/releases.html
下载windows版的winutils(适用于2.7.x)
网址:https://github.com/SweetInk/hadoop-common-2.7.1-bin
解压hadoop及winutils,将winutils解压后的所有文件覆盖到hadoop的bin目录下
复制winutils目录下的hadoop.dll文件到C:\windows\system32目录下
2.配置hadoop环境变量
新建系统变量,HADOOP_HOME:F:\software\hadoop-2.7.3
加入PATH,;%HADOOP_HOME%\bin;
最后重启电脑
二、创建 Maven 项目
笔者使用的开发工具为 Idea,你也可以使用 Eclipse 或其它开发工具,创建一个maven 项目。
1.pom.xml 文件配置如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdata</groupId>
<artifactId>hadoop-hdfs-javaapi</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.配置 log4j.properties
在src/main/resource 下新建文件log4j.properties,配置如下
#OFF,systemOut,logFile,logDailyFile,logRollingFile,logMail,logDB,ALL
log4j.rootLogger=ALL,systemOut
#输出到控制台
log4j.appender.systemOut= org.apache.log4j.ConsoleAppender
log4j.appender.systemOut.layout= org.apache.log4j.PatternLayout
log4j.appender.systemOut.layout.ConversionPattern= [%-5p][%-22d{yyyy/MM/dd HH:mm:ssS}][%l]%n%m%n
log4j.appender.systemOut.Threshold= INFO
log4j.appender.systemOut.ImmediateFlush= TRUE
log4j.appender.systemOut.Target= System.out
最后将hadoop 的五个配置文件复制粘贴到src\main\resources目录下
三 、Java API 操作HDFS
客户端去操作HDFS时,需要有一个用户身份,默认默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份,用户身份有多种配置方式,如下
(1).jvm 添加
点击左上角,选择第一项
添加内容:-DHADOOP_USER_NAME=hadoop
(2) 配置环境变量
HADOOP_USER_NAME
hadoop
Java API 操作
1.文件拷贝、目录创建及删除
package com.bigdata.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;
public class TestHDFS {
public static Logger logger = Logger.getLogger(TestHDFS.class);
public FileSystem fs = null;
public Configuration conf = null;
@Before
public void init() throws Exception{
// 创建hadoop配置上下文对象,用来加载hadoop在运行时的默认配置
conf = new Configuration();
//创建hadoop文件系统对象,执行hadoop文件操作
fs = FileSystem.get(conf);
}
// 拷贝本地文件上传到HDFS
@Test
public void testUploadFile() throws Exception {
fs.copyFromLocalFile(new Path("C:/hdfstest.txt"),new Path("/hdfstest.txt"));
}
// 从HDFS中拷贝文件到本地
@Test
public void testDownloadFile() throws Exception {
fs.copyToLocalFile(new Path("/hdfstest.txt"), new Path("D:/"));
}
// 创建目录
@Test
public void testMakeDir() throws Exception {
boolean flag = fs.mkdirs(new Path("/a/b/c"));
if (flag) {
logger.info("目录创建成功!");
}
}
// 删除目录
@Test
public void testRmoveDir() throws Exception {
boolean flag = fs.delete(new Path("/a"), true);// 递归删除
if (flag) {
logger.info("目录删除成功!");
}
}
// 关闭资源
@After
public void destory() throws Exception {
if (fs != null){
fs.close();
}
}
}
2.文件批量上传
将项目upload目录下所有的文件上传到HDFS的用户目录下(/home/hadoop),upload目录结构如下
package com.bigdata.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
public class TestUploadHDFS {
public static Logger logger = Logger.getLogger(TestUploadHDFS.class);
public FileSystem fs = null;
public Configuration conf = null;
@Test
public void testUpload() throws Exception{
conf = new Configuration();
fs = FileSystem.get(conf);
Path home = fs.getHomeDirectory();// 获取用户目录
Path hdfs_file = new Path(home, "hdfs_file");
upload(hdfs_file, new File("E:\\code\\workspace_idea\\hadoopproject\\hadoop-hdfs-javaapi\\upload"));
}
public void upload(Path hdfs_file, File file) throws Exception {
if(!fs.exists(hdfs_file)) {
fs.mkdirs(hdfs_file);
}
File[] files = file.listFiles();
if (files != null && files.length > 0){
for (File f: files) {
// 如果是目录则创建,如果是文件则上传
if (f.isDirectory()){
Path path = new Path(hdfs_file, f.getName());
upload(path, f);
} else {
uploadFile(hdfs_file, f);
}
}
}
}
public void uploadFile(Path hdfs_file, File f) throws Exception {
if (!fs.exists(hdfs_file)){
fs.mkdirs(hdfs_file);
}
InputStream in = new FileInputStream(f);
FSDataOutputStream out = fs.create(new Path(hdfs_file, f.getName()));
IOUtils.copyBytes(in, out, 1024, true); // true表示自动关闭资源
logger.info("文件" + f.getName() + "上传完成");
}
}
3. 文件批量下载
把HDFS的/user/hadoop/hdfs_file目录下的所有文件下载到项目的download目录中
package com.bigdata.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
public class TestDownLoadHDFS {
public static Logger logger = Logger.getLogger(TestDownLoadHDFS.class);
public FileSystem fs = null;
public Configuration conf = null;
@Test
public void testDownload() throws Exception {
conf = new Configuration();
fs = FileSystem.get(conf);
Path home = fs.getHomeDirectory();
download(new Path(home.toString() + "/hdfs_file"), new File("E:\\code\\workspace_idea\\hadoopproject\\hadoop-hdfs-javaapi\\download"));
}
private void download(Path home, File dir) throws Exception {
if (!dir.exists()) {
dir.mkdirs();
}
// 获取hadoop集群中目标目录中的所有文件信息
FileStatus[] fileStatuses = fs.listStatus(home);
if (fileStatuses != null && fileStatuses.length > 0){
// 循环遍历
for (FileStatus f: fileStatuses ) {
// 判断是目录还是文件,如果目录则创建递归创建
if (f.isDirectory()) {
String name = f.getPath().getName();
File toFile = new File(dir, name);
download(f.getPath(), toFile);
} else {
// 下载文件
downloadFile(f.getPath(),dir);
}
}
}
}
private void downloadFile(Path path, File dir) throws Exception {
if (!dir.exists()) {
dir.mkdirs();
}
File file = new File(dir, path.getName());
FSDataInputStream in = fs.open(path);
OutputStream out = new FileOutputStream(file);
// true:表示当操作完成后,会自动关闭流
IOUtils.copyBytes(in, out, 1024, true);
}
}
运行结果: