欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop 之 HDFS (HDFS客户端操作)

程序员文章站 2024-03-23 00:01:52
...

三、HDFS客户端操作(开发重点)

3.1 HDFS 客户端操作

(1)找到资料目录下的Windows依赖目录,打开:拷贝到其他地方

Hadoop 之 HDFS (HDFS客户端操作)

(2)配置 HADOOP_HOME 环境变量

Hadoop 之 HDFS (HDFS客户端操作)

(3)配置 Path 环境变量,然后重启电脑

Hadoop 之 HDFS (HDFS客户端操作)

(4)创建一个 Maven 工程 hadoopHDFS,并导入相应的依赖坐标+日志添加

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
</Configuration>

(5)创建包名:com.xiaoxq.hdfs

(6)创建HdfsClient类

public class HdfsClient {
    private URI uri;
    private Configuration conf;
    private String user;
    private FileSystem fs;
    
    @Before //在执行test方法之前会先执行一次before方法
    public void getConnect() throws URISyntaxException, IOException, InterruptedException{
        //1、获取文件系统
        uri = new URI("hdfs://hadoop105:9820");
        conf= new Configuration();
        user = "xiaoxq";
        conf.set("dfs.replication","2");//设置副本的个数

        //获取客户端对象
        //参数解读 1. nn的地址     2. 配置文件     3.操作hdfs的用户
        fs = FileSystem.get(uri,conf,user);
        //用客户端对象进行具体操作

//        fs = FileSystem.get(new URI("hdfs://hadoop105:9820"), new Configuration(), "xiaoxq");
    }
    @After//执行完test方法后,会执行一次after方法
    public void closeConnect() throws IOException {
        //3、关闭客户端对象资源
        fs.close();
    }
    @Test
    public void hdfsClient() throws URISyntaxException, IOException, InterruptedException{
        //1、获取文件系统
//        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop105:9820"), new Configuration(), "xiaoxq");
        //2、创建目录
        boolean b = fs.mkdirs(new Path("/java3"));
        if (b) {
            System.out.println("成功");
        }else{
            System.out.println("失败");
        }
        //3、关闭资源
//        fs.close();
    }
}

3.2 HDFS 的 API 操作

/*客户端代码常用套路
 *  1.获取客户端对象
 *  2.用客户端对象做各种操作
 *  3.关闭客户端对象
 *  最经典的案例:hdfs zookeeper
 */
public class HdfsClient {
        private URI uri;
        private Configuration conf;
        private String user;
        private FileSystem fs;

        @Before //在执行test方法之前会先执行一次before方法
        public void getConnect() throws URISyntaxException, IOException, InterruptedException {
            //1、获取文件系统
            uri = new URI("hdfs://hadoop105:9820");
            conf= new Configuration();
            user = "xiaoxq";
            conf.set("dfs.replication","2");//设置副本的个数

            //获取客户端对象
            //参数解读 1. nn的地址     2. 配置文件     3.操作hdfs的用户
            fs = FileSystem.get(uri,conf,user);
            //用于客户端对象进行具体操作
        }
        @After//执行完test方法后,会执行一次after方法
        public void closeConnect() throws IOException {
            //3、关闭客户端对象资源
            fs.close();
        }
        @Test
        public void hdfsClient() throws URISyntaxException, IOException, InterruptedException {
            //2、创建目录
            boolean b = fs.mkdirs(new Path("/java3"));
            if (b) {
                System.out.println("成功");
            }else{
                System.out.println("失败");
            }
        }
    /**
     * HDFS文件上传
     * hadoop参数设置方式:1. xxx-default.xml  2.服务器hadoop安装目录下 conf/etc/hadoopxxx-site.xml
     * 3.客户端代码类路径下 xxx-site.xml     4.在代码里面通过 conf.set 来设置
     * 优先级:1 < 2 < 3 < 4
     */
    @Test
    public void testPut() throws IOException {
        //参数解读 1. 是否删除源文件(本地文件) 2.是否覆盖目标文件(hdfs文件)  3.源文件路径     4.目标文件路径
        fs.copyFromLocalFile(false,false,new Path("E:\\word.txt"),new Path("/java"));
    }
    /**
     * 文件下载
     */
    @Test
    public void testGet() throws IOException {
        //参数解读 1.是否删除源文件(hdfs文件)    2.源文件路径(hdfs上要下载的文件)3.目标路径  4.是否开启crc校验 true不开启 false开启
        fs.copyToLocalFile(false,new Path("/java/word.txt"),new Path("E:\\word3.txt"),true);
    }
    /**
     * 文件和目录的删除
     */
    @Test
    public void testRM() throws IOException {
        //参数解读 1.要删除的路径 2.是否递归
        //删除文件
//        fs.delete(new Path("/java/word.txt"),false);
        //删除空目录
//        fs.delete(new Path("/java2"),false);
        //删除非空目录
        fs.delete(new Path("/java"),true);
    }

    /**
     * 文件和目录的移动和更名
     */
    @Test
    public void testMV() throws IOException {
        //文件的更名
//        fs.rename(new Path("/java/word.txt"),new Path("/java/word2.txt"));
        //文件的移动
//        fs.rename(new Path("/java/word.txt"),new Path("/java2/word2.txt"));
        //目录的更名
        //fs.rename(new Path("/java"),new Path("/java2"));
        //目录的移动
        fs.rename(new Path("/java"), new Path("/java2/java"));
    }
    /**
     * 文档详情查看
     */
    @Test
    public void testList() throws IOException {
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path("/"), true);
        while (remoteIterator.hasNext()) {
            LocatedFileStatus fileStatus = remoteIterator.next();
            System.out.println("===========" + fileStatus.getPath() + "===========");
            System.out.println(fileStatus.getPermission());
            System.out.println(fileStatus.getOwner());
            System.out.println(fileStatus.getGroup());
            System.out.println(fileStatus.getLen());
            System.out.println(fileStatus.getModificationTime());
            System.out.println(fileStatus.getReplication());
            System.out.println(fileStatus.getBlockSize());
            System.out.println(fileStatus.getPath().getName());

            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            System.out.println(Arrays.toString(blockLocations));
        }
    }
    /**
     * 文件和目录判断
     */
    @Test
    public void testIsFileOrDir() throws IOException {
        FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
        //查看并判断根目录下的一层目录中的文件和目录
        for (FileStatus fileStatus : fileStatuses) {
            boolean file = fileStatus.isFile();
            if (file) {
                System.out.println("文件:" + fileStatus.getPath());
            }else{
                System.out.println("目录:" + fileStatus.getPath());
            }
        }
    }

    //利用递归自己实现查看一个路径下所有的文件和目录
    public void isAll(String path,FileSystem fileSystem) throws IOException {
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path));
        for (FileStatus fileStatus : fileStatuses) {
            boolean file = fileStatus.isFile();
            if (file) {
                //如果当前路径下是文件,直接打印
                System.out.println("文件:" + fileStatus.getPath());
            }else{
                //如果你当前路径下是目录,先打印一下目录,然后进去再次判断,因为不知道子目录下是否还有子目录,所以需要递归调用自己
                System.out.println("目录:" + fileStatus.getPath());
                isAll(fileStatus.getPath().toString(),fileSystem);
            }
        }
    }

    @Test
    public void testAll() throws IOException {
        isAll("/",fs);
    }
    /**
     * 基于IO流的上传
     */
    @Test
    public void testPutByIO() throws IOException {
        //1、获取本地文件输入流
        FileInputStream fis = new FileInputStream(new File("F:\\input\\wc.txt"));
        //2、获取hdfs文件输出流
        FSDataOutputStream hdfsfos = fs.create(new Path("/java2/wc.txt"));
        //3、流的对拷
        IOUtils.copyBytes(fis,hdfsfos,conf);
        //4、流的关闭(距离近的先关闭流)
        IOUtils.closeStream(hdfsfos);
        IOUtils.closeStream(fis);
    }
    /**
     * 基于IO流的下载
     */
    @Test
    public void testGetByIO() throws IOException {
        //1、获取hdfs文件输入流
        FSDataInputStream hdfsfis = fs.open(new Path("/java2/wc.txt"));
        //2、获取本地文件输出流
        FileOutputStream fos = new FileOutputStream(new File("F:\\input\\wc3.txt"));
        //3、流的对拷
        IOUtils.copyBytes(hdfsfis,fos,conf);
        //4、流的关闭
        IOUtils.closeStream(fos);
        IOUtils.closeStream(hdfsfis);
    }
}