欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HDFS的javaAPI操作

程序员文章站 2024-03-23 08:09:22
...

创建maven工程并导入jar包

由于cdh版本的所有的软件涉及版权的问题,所以并没有将所有的jar包托管到maven仓库当中去,而是托管在了CDH自己的服务器上面,所以我们默认去maven的仓库下载不到,需要自己手动的添加repository去CDH仓库进行下载,以下两个地址是官方文档说明,请仔细查阅
https://www.cloudera.com/documentation/enterprise/releasenotes/topics/cdh_vd_cdh5_maven_repo.html
https://www.cloudera.com/documentation/enterprise/releasenotes/topics/cdh_vd_cdh5_maven_repo_514x.html

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.Hadoop</groupId>
        <artifactId>Hadoop-client</artifactId>
        <version>2.6.0-mr1-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.Hadoop</groupId>
        <artifactId>Hadoop-common</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.Hadoop</groupId>
        <artifactId>Hadoop-hdfs</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.Hadoop</groupId>
        <artifactId>Hadoop-mapreduce-client-core</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testng</groupId>
        <artifactId>testng</artifactId>
        <version>RELEASE</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!--    <verbal>true</verbal>-->
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <minimizeJar>true</minimizeJar>
                    </configuration>
                </execution>
            </executions>
        </plugin>
      <!--  <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>cn.itcast.Hadoop.db.DBToHdfs2</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>-->
    </plugins>
</build>

使用文件系统方式访问数据

在 java 中操作 HDFS,主要涉及以下 Class:
Configuration:该类的对象封转了客户端或者服务器的配置;
FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作,通过 FileSystem 的静态方法 get 获得该对象。
FileSystem fs = FileSystem.get(conf)
get 方法从 conf 中的一个参数 fs.defaultFS 的配置值判断具体是什么类型的文件系统。如果我们的代码中没有指定 fs.defaultFS,并且工程 classpath下也没有给定相应的配置,conf中的默认值就来自于Hadoop的jar包中的core-default.xml , 默 认 值 为 : file:/// , 则 获 取 的 将 不 是 一 个DistributedFileSystem 的实例,而是一个本地文件系统的客户端对象

HDFS的javaAPI操作

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;

/**
 * @author huang
 * @version 1.0
 * @date 2019/11/7 10:41
 */
public class Hdfs01 {
    //该类的对象封转了客户端或者服务器的配置
    static Configuration conf   = new Configuration();



    //获取指定路径所有文件
    public  static void  listStatus()  throws Exception  {
        //该类的对象是一个文件系统对象
        FileSystem hdfs= FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        //获取某一目录下的所有文件
        FileStatus stats[]=hdfs.listStatus(new Path("/abc/aaa/"));

        //遍历输出
        for(int i = 0; i < stats.length; ++i)
            System.out.println(stats[i].getPath().toString());
        hdfs.close();
    }

    //重命名
    public  static void  rename()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        Path frpaht=new Path("/abc");
        Path topath=new Path("/abcd");
        boolean isRename=hdfs.rename(frpaht, topath);
        String result=isRename?"修改成功!":"修改失败!";
        System.out.println(result);
    }

    //获取文件日期
    public  static void  GetTime()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        FileStatus fileStatus=hdfs.getFileStatus(new Path("/abcd/aaa/1.txt"));
        long modiTime=fileStatus.getModificationTime();

        System.out.println(modiTime);
    }

    //删除文件
    public  static void  deletefile()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        boolean isDeleted=hdfs.delete(new Path("/user/new"),true);
        System.out.println("Delete?"+isDeleted);
    }
    //创建文件夹

    public  static void  mkdir ()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        boolean bool2=hdfs.mkdirs(new Path("/bbb/ccc"));
        if (bool2)
        {
            System.out.println("创建成功!!");
        }
        else
        {
            System.out.println("创建失败!!");
        }
    }

    //创建数据
    public  static void  AddFile()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.201:8020"),conf);
        byte[] buff="hello hadoop world!\r\n hadoop ".getBytes();
        FSDataOutputStream outputStream=hdfs.create(new Path("/tmp/file.txt"));
        outputStream.write(buff,0,buff.length);
        outputStream.close();
    }

    //上传数据
    public  static void  put()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.201:8020"),conf);
        Path src =new Path("C:/123.py");
        Path dst =new Path("/");
        hdfs.copyFromLocalFile(src, dst);
    }
    //检查目录是否存在
    public  static void  check()  throws Exception  {
        FileSystem hdfs=FileSystem.get(new URI("hdfs://192.168.100.100:8020"),conf);
        Path findf=new Path("/abc");
        boolean isExists=hdfs.exists(findf);
        System.out.println("Exist?"+isExists);
    }

    public static void main(String[] args) throws Exception {
        //获取指定路径所有文件
        //listStatus();

        //重命名
        //rename();
        //获取文件日期
       // GetTime();
        //创建文件夹
        mkdir ();
        //删除文件
        //deletefile();
        //创建数据
       // AddFile();
        //上传数据
       // put();
        //检查目录是否存在
       // check();
    }

}

如果执行出现以下错误,可以参见资料如何解决,也可以不用理会,不会影响程序的执行。记得配置完成环境变量之后重启开发工具

HDFS的javaAPI操作

HDFS权限问题以及伪造用户

首先停止hdfs集群,在node01机器上执行以下命令

cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/stop-dfs.sh

修改node01机器上的hdfs-site.xml当中的配置文件

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
vim hdfs-site.xml

<property>
                <name>dfs.permissions</name>
                <value>true</value>
       </property>

修改完成之后配置文件发送到其他机器上面去

scp hdfs-site.xml node02:PWDscphdfssite.xmlnode03:PWD scp hdfs-site.xml node03:PWD

重启hdfs集群

cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/start-dfs.sh

随意上传一些文件到我们Hadoop集群当中准备测试使用

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml

使用代码准备下载文件

@Test
public void getConfig()throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.100.100:8020"), new Configuration(),"root");
    fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml"));
    fileSystem.close();
}

HDFS的小文件合并

由于Hadoop擅长存储大文件,因为大文件的元数据信息比较少,如果Hadoop集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理。
在我们的hdfs 的shell命令模式下,可以通过命令行将很多的hdfs文件合并成一个大文件下载到本地,命令如下

cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml

既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在上传的时候将小文件合并到一个大文件里面去
代码如下:

HDFS的javaAPI操作

/**
 * 将多个本地系统文件,上传到hdfs,并合并成一个大的文件
 * @throws Exception
 */
@Test
public void mergeFile() throws  Exception{
    //获取分布式文件系统
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.100.100:8020"), new Configuration(),"root");
    FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml"));
    //获取本地文件系统
    LocalFileSystem local = FileSystem.getLocal(new Configuration());
    //通过本地文件系统获取文件列表,为一个集合
    FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\\传智播客大数据离线阶段课程资料\\3、大数据离线第三天\\上传小文件合并"));
    for (FileStatus fileStatus : fileStatuses) {
        FSDataInputStream inputStream = local.open(fileStatus.getPath());
       IOUtils.copy(inputStream,outputStream);
        IOUtils.closeQuietly(inputStream);
    }
    IOUtils.closeQuietly(outputStream);
    local.close();
    fileSystem.close();
}