欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

HDFS常用API操作 和 HDFS的I/O流操作

程序员文章站 2022-04-21 16:50:48
前置操作 创建maven工程,修改pom.xml文件: 在resources添加一个file:log4j.properties: API操作 HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作: I/O流操作 上面的API操作 HDFS系统都是框架封装好的,如果我们 ......

前置操作

创建maven工程,修改pom.xml文件:

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>
  <groupid>com.mcq</groupid>
  <artifactid>hdfs-001</artifactid>
  <version>0.0.1-snapshot</version>
  <dependencies>
		<dependency>
			<groupid>junit</groupid>
			<artifactid>junit</artifactid>
			<version>release</version>
		</dependency>
		<dependency>
			<groupid>org.apache.logging.log4j</groupid>
			<artifactid>log4j-core</artifactid>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupid>org.apache.hadoop</groupid>
			<artifactid>hadoop-common</artifactid>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupid>org.apache.hadoop</groupid>
			<artifactid>hadoop-client</artifactid>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupid>org.apache.hadoop</groupid>
			<artifactid>hadoop-hdfs</artifactid>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupid>jdk.tools</groupid>
			<artifactid>jdk.tools</artifactid>
			<version>1.8</version>
			<scope>system</scope>
			<systempath>${java_home}/lib/tools.jar</systempath>
		</dependency>
</dependencies>

</project>

在resources添加一个file:log4j.properties:

log4j.rootlogger=info, stdout
log4j.appender.stdout=org.apache.log4j.consoleappender
log4j.appender.stdout.layout=org.apache.log4j.patternlayout
log4j.appender.stdout.layout.conversionpattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.fileappender
log4j.appender.logfile.file=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.patternlayout
log4j.appender.logfile.layout.conversionpattern=%d %p [%c] - %m%n

 

api操作

hdfs的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:

package com.mcq;

import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.blocklocation;
import org.apache.hadoop.fs.filestatus;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.locatedfilestatus;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.fs.remoteiterator;
import org.junit.test;

public class hdfsclient {
	public static void main(string[] args) throws ioexception, interruptedexception, urisyntaxexception {
		configuration conf = new configuration();
		// c.set("fs.defaultfs", "hdfs://hadoop103:9000");
		// filesystem fs = filesystem.get(c);
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), conf, "mcq");
		fs.mkdirs(new path("/ppqq"));
		fs.close();
		system.out.println("over");
	}

	@test // 文件上传
	public void testcopyfromlocalfile()
			throws illegalargumentexception, ioexception, interruptedexception, urisyntaxexception {
		configuration conf = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copyfromlocalfile(new path("d:/banzhang.txt"), new path("/banzhang.txt"));
		fs.close();
		system.out.println("over");
	}

	@test // 文件下载
	public void testcopytolocalfile() throws ioexception, interruptedexception, urisyntaxexception {
		configuration conf = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copytolocalfile(false, new path("/banzhang.txt"), new path("d:/hadoop test/banhua.txt"), true);
		// 第一个false表示不剪切,最后一个true表示本地,不产生crc文件

		fs.close();
		system.out.println("over");
	}

	@test // 文件删除
	public void testdelete() throws ioexception, interruptedexception, urisyntaxexception {
		configuration conf = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), conf, "mcq");
		fs.delete(new path("/0811"), true); // 是否递归删除
		fs.close();
		system.out.println("over");
	}

	@test // 文件更名
	public void testrename() throws ioexception, interruptedexception, urisyntaxexception {
		configuration conf = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), conf, "mcq");
		fs.rename(new path("/banzhang.txt"), new path("/lala.txt"));
		fs.close();
		system.out.println("over");
	}

	@test
	public void testlistfiles() throws ioexception, interruptedexception, urisyntaxexception {

		// 1获取文件系统
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 获取文件详情
		remoteiterator<locatedfilestatus> listfiles = fs.listfiles(new path("/"), true);

		while (listfiles.hasnext()) {
			locatedfilestatus status = listfiles.next();

			// 输出详情
			// 文件名称
			system.out.println(status.getpath().getname());
			// 长度
			system.out.println(status.getlen());
			// 权限
			system.out.println(status.getpermission());
			// 分组
			system.out.println(status.getgroup());

			// 获取存储的块信息
			blocklocation[] blocklocations = status.getblocklocations();

			for (blocklocation blocklocation : blocklocations) {

				// 获取块存储的主机节点
				string[] hosts = blocklocation.gethosts();

				for (string host : hosts) {
					system.out.println(host);
				}
			}

			system.out.println("-----------分割线----------");
		}

		// 3 关闭资源
		fs.close();
	}
	
	@test
	public void testliststatus() throws ioexception, interruptedexception, urisyntaxexception{
			
		// 1 获取文件配置信息
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 判断是文件还是文件夹
		filestatus[] liststatus = fs.liststatus(new path("/"));
			
		for (filestatus filestatus : liststatus) {
			
			// 如果是文件
			if (filestatus.isfile()) {
					system.out.println("f:"+filestatus.getpath().getname());
				}else {
					system.out.println("d:"+filestatus.getpath().getname());
				}
			}
			
		// 3 关闭资源
		fs.close();
	}
}

 i/o流操作

上面的api操作 hdfs系统都是框架封装好的,如果我们想自己实现上述api操作可以采用io流的方式实现数据的上传和下载。

 

package com.mcq;

import java.io.file;
import java.io.fileinputstream;
import java.io.fileoutputstream;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.fsdataoutputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.ioutils;
import org.apache.hadoop.yarn.api.records.url;
import org.junit.test;

public class hdfsio {
	//文件上传
	@test
	public void putfiletohdfs() throws ioexception, interruptedexception, urisyntaxexception {

		// 1 获取文件系统
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 创建输入流
		fileinputstream fis = new fileinputstream(new file("d:/banzhang.txt"));

		// 3 获取输出流
		fsdataoutputstream fos = fs.create(new path("/xiaocao.txt"));

		// 4 流对拷
		ioutils.copybytes(fis, fos, configuration);

		// 5 关闭资源
		ioutils.closestream(fos);
		ioutils.closestream(fis);
		fs.close();
	}
	// 文件下载
	@test
	public void getfilefromhdfs() throws ioexception, interruptedexception, urisyntaxexception{

		// 1 获取文件系统
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 获取输入流
		fsdatainputstream fis = fs.open(new path("/banhua.txt"));
			
		// 3 获取输出流
		fileoutputstream fos = new fileoutputstream(new file("d:/banhua.txt"));
			
		// 4 流的对拷
		ioutils.copybytes(fis, fos, configuration);
			
		// 5 关闭资源
		ioutils.closestream(fos);
		ioutils.closestream(fis);
		fs.close();
	}
	//定位文件读取
	//(1)下载第一块
	@test
	public void readfileseek1() throws ioexception, interruptedexception, urisyntaxexception{

		// 1 获取文件系统
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 获取输入流
		fsdatainputstream fis = fs.open(new path("/hadoop-2.7.2.tar.gz"));
			
		// 3 创建输出流
		fileoutputstream fos = new fileoutputstream(new file("e:/hadoop-2.7.2.tar.gz.part1"));
			
		// 4 流的拷贝
		byte[] buf = new byte[1024];
			
		for(int i =0 ; i < 1024 * 128; i++){
			fis.read(buf);
			fos.write(buf);
		}
			
		// 5关闭资源
		ioutils.closestream(fis);
		ioutils.closestream(fos);
	fs.close();
	}
	//(2)下载第二块
	@test
	public void readfileseek2() throws ioexception, interruptedexception, urisyntaxexception{

		// 1 获取文件系统
		configuration configuration = new configuration();
		filesystem fs = filesystem.get(new uri("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 打开输入流
		fsdatainputstream fis = fs.open(new path("/hadoop-2.7.2.tar.gz"));
			
		// 3 定位输入数据位置
		fis.seek(1024*1024*128);
			
		// 4 创建输出流
		fileoutputstream fos = new fileoutputstream(new file("e:/hadoop-2.7.2.tar.gz.part2"));
			
		// 5 流的对拷
		ioutils.copybytes(fis, fos, configuration);
			
		// 6 关闭资源
		ioutils.closestream(fis);
		ioutils.closestream(fos);
	}
}