Hadoop 之 HDFS (HDFS客户端操作)
程序员文章站
2024-03-23 00:01:52
...
三、HDFS客户端操作(开发重点)
3.1 HDFS 客户端操作
(1)找到资料目录下的Windows依赖目录,打开:拷贝到其他地方
(2)配置 HADOOP_HOME 环境变量
(3)配置 Path 环境变量,然后重启电脑
(4)创建一个 Maven 工程 hadoopHDFS,并导入相应的依赖坐标+日志添加
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为PatternLayout的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender>
</Appenders>
<Loggers>
<!-- 可加性为false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger>
<!-- root loggerConfig设置 -->
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
(5)创建包名:com.xiaoxq.hdfs
(6)创建HdfsClient类
public class HdfsClient {
private URI uri;
private Configuration conf;
private String user;
private FileSystem fs;
@Before //在执行test方法之前会先执行一次before方法
public void getConnect() throws URISyntaxException, IOException, InterruptedException{
//1、获取文件系统
uri = new URI("hdfs://hadoop105:9820");
conf= new Configuration();
user = "xiaoxq";
conf.set("dfs.replication","2");//设置副本的个数
//获取客户端对象
//参数解读 1. nn的地址 2. 配置文件 3.操作hdfs的用户
fs = FileSystem.get(uri,conf,user);
//用客户端对象进行具体操作
// fs = FileSystem.get(new URI("hdfs://hadoop105:9820"), new Configuration(), "xiaoxq");
}
@After//执行完test方法后,会执行一次after方法
public void closeConnect() throws IOException {
//3、关闭客户端对象资源
fs.close();
}
@Test
public void hdfsClient() throws URISyntaxException, IOException, InterruptedException{
//1、获取文件系统
// FileSystem fs = FileSystem.get(new URI("hdfs://hadoop105:9820"), new Configuration(), "xiaoxq");
//2、创建目录
boolean b = fs.mkdirs(new Path("/java3"));
if (b) {
System.out.println("成功");
}else{
System.out.println("失败");
}
//3、关闭资源
// fs.close();
}
}
3.2 HDFS 的 API 操作
/*客户端代码常用套路
* 1.获取客户端对象
* 2.用客户端对象做各种操作
* 3.关闭客户端对象
* 最经典的案例:hdfs zookeeper
*/
public class HdfsClient {
private URI uri;
private Configuration conf;
private String user;
private FileSystem fs;
@Before //在执行test方法之前会先执行一次before方法
public void getConnect() throws URISyntaxException, IOException, InterruptedException {
//1、获取文件系统
uri = new URI("hdfs://hadoop105:9820");
conf= new Configuration();
user = "xiaoxq";
conf.set("dfs.replication","2");//设置副本的个数
//获取客户端对象
//参数解读 1. nn的地址 2. 配置文件 3.操作hdfs的用户
fs = FileSystem.get(uri,conf,user);
//用于客户端对象进行具体操作
}
@After//执行完test方法后,会执行一次after方法
public void closeConnect() throws IOException {
//3、关闭客户端对象资源
fs.close();
}
@Test
public void hdfsClient() throws URISyntaxException, IOException, InterruptedException {
//2、创建目录
boolean b = fs.mkdirs(new Path("/java3"));
if (b) {
System.out.println("成功");
}else{
System.out.println("失败");
}
}
/**
* HDFS文件上传
* hadoop参数设置方式:1. xxx-default.xml 2.服务器hadoop安装目录下 conf/etc/hadoopxxx-site.xml
* 3.客户端代码类路径下 xxx-site.xml 4.在代码里面通过 conf.set 来设置
* 优先级:1 < 2 < 3 < 4
*/
@Test
public void testPut() throws IOException {
//参数解读 1. 是否删除源文件(本地文件) 2.是否覆盖目标文件(hdfs文件) 3.源文件路径 4.目标文件路径
fs.copyFromLocalFile(false,false,new Path("E:\\word.txt"),new Path("/java"));
}
/**
* 文件下载
*/
@Test
public void testGet() throws IOException {
//参数解读 1.是否删除源文件(hdfs文件) 2.源文件路径(hdfs上要下载的文件)3.目标路径 4.是否开启crc校验 true不开启 false开启
fs.copyToLocalFile(false,new Path("/java/word.txt"),new Path("E:\\word3.txt"),true);
}
/**
* 文件和目录的删除
*/
@Test
public void testRM() throws IOException {
//参数解读 1.要删除的路径 2.是否递归
//删除文件
// fs.delete(new Path("/java/word.txt"),false);
//删除空目录
// fs.delete(new Path("/java2"),false);
//删除非空目录
fs.delete(new Path("/java"),true);
}
/**
* 文件和目录的移动和更名
*/
@Test
public void testMV() throws IOException {
//文件的更名
// fs.rename(new Path("/java/word.txt"),new Path("/java/word2.txt"));
//文件的移动
// fs.rename(new Path("/java/word.txt"),new Path("/java2/word2.txt"));
//目录的更名
//fs.rename(new Path("/java"),new Path("/java2"));
//目录的移动
fs.rename(new Path("/java"), new Path("/java2/java"));
}
/**
* 文档详情查看
*/
@Test
public void testList() throws IOException {
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path("/"), true);
while (remoteIterator.hasNext()) {
LocatedFileStatus fileStatus = remoteIterator.next();
System.out.println("===========" + fileStatus.getPath() + "===========");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
/**
* 文件和目录判断
*/
@Test
public void testIsFileOrDir() throws IOException {
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
//查看并判断根目录下的一层目录中的文件和目录
for (FileStatus fileStatus : fileStatuses) {
boolean file = fileStatus.isFile();
if (file) {
System.out.println("文件:" + fileStatus.getPath());
}else{
System.out.println("目录:" + fileStatus.getPath());
}
}
}
//利用递归自己实现查看一个路径下所有的文件和目录
public void isAll(String path,FileSystem fileSystem) throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path));
for (FileStatus fileStatus : fileStatuses) {
boolean file = fileStatus.isFile();
if (file) {
//如果当前路径下是文件,直接打印
System.out.println("文件:" + fileStatus.getPath());
}else{
//如果你当前路径下是目录,先打印一下目录,然后进去再次判断,因为不知道子目录下是否还有子目录,所以需要递归调用自己
System.out.println("目录:" + fileStatus.getPath());
isAll(fileStatus.getPath().toString(),fileSystem);
}
}
}
@Test
public void testAll() throws IOException {
isAll("/",fs);
}
/**
* 基于IO流的上传
*/
@Test
public void testPutByIO() throws IOException {
//1、获取本地文件输入流
FileInputStream fis = new FileInputStream(new File("F:\\input\\wc.txt"));
//2、获取hdfs文件输出流
FSDataOutputStream hdfsfos = fs.create(new Path("/java2/wc.txt"));
//3、流的对拷
IOUtils.copyBytes(fis,hdfsfos,conf);
//4、流的关闭(距离近的先关闭流)
IOUtils.closeStream(hdfsfos);
IOUtils.closeStream(fis);
}
/**
* 基于IO流的下载
*/
@Test
public void testGetByIO() throws IOException {
//1、获取hdfs文件输入流
FSDataInputStream hdfsfis = fs.open(new Path("/java2/wc.txt"));
//2、获取本地文件输出流
FileOutputStream fos = new FileOutputStream(new File("F:\\input\\wc3.txt"));
//3、流的对拷
IOUtils.copyBytes(hdfsfis,fos,conf);
//4、流的关闭
IOUtils.closeStream(fos);
IOUtils.closeStream(hdfsfis);
}
}