HDFS编程实践
Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)是Hadoop核心组件之一,如果已经安装了Hadoop(我已经安装了,安装过程->传送门),其中就已经包含了HDFS组件,不需要另外安装,可以做HDFS的编程实践了。这里主要参考林子雨老师的博客。
这里涉及的知识点比较多:分布式文件系统、HDFS简介、HDFS的相关概念、HDFS体系结构、HDFS的存储原理、HDFS的数据读写过程。
接下来介绍Linux操作系统中关于HDFS文件操作的常用Shell命令,利用Web界面查看和管理Hadoop文件系统,以及利用Hadoop提供的Java API进行基本的文件操作。
在开始学习HDFS编程实践前,需要启动Hadoop(版本是Hadoop3.1.3)。执行如下命令
start-dfs.sh #启动hadoop
一、利用Shell命令与HDFS进行交互
Hadoop支持很多Shell命令,其中fs是HDFS最常用的命令,利用fs可以查看HDFS文件系统的目录结构、上传和下载数据、创建文件等。
在终端输入如下命令,查看fs总共支持了哪些命令:
hadoop fs
1. 目录操作
注意
Hadoop系统安装好以后,第一次使用HDFS时,需要首先在HDFS中创建用户目录。这里采用hadoop用户登录Linux系统,因此,需要在HDFS中为hadoop用户创建一个用户目录,命令如下:
hdfs dfs -mkdir -p /user/hadoop
参考上一节的伪分布式实例,如果之前已经创建好用户目录,这里就不需要再创建。
“-ls”列出HDFS某个目录下的所有内容
hdfs dfs -ls . # “.”表示HDFS中的当前用户目录,现在就是“/user/hadoop”目录
hdfs dfs -ls /user/hadoop # 现在这两条命令是等效的,选择一个即可
如果要列出HDFS上的所有目录,可以使用如下命令:
hdfs dfs -ls
2. 文件操作
在实际应用中,经常需要从本地文件系统向HDFS中上传文件,或者把HDFS中的文件下载到本地文件系统中。
首先,使用vim编辑器,在Linux文件系统的“/home/hadoop/”目录下创建一个文件myLocalFile.txt,里面可以随意输入一些单词,比如,输入如下三行:
Hadoop
Spark
XMU DBLAB
然后,可以使用如下命令把本地文件系统的“/home/hadoop/myLocalFile.txt”上传到HDFS中的当前用户目录的input目录下,也就是上传到HDFS的“/user/hadoop/input/”目录下:
hdfs dfs -put /home/hadoop/myLocalFile.txt input
可以使用ls命令查看一下文件是否成功上传到HDFS中
hdfs dfs -ls input
显示类似如下的信息:
Found 10 items
......
-rw-r--r-- 1 hadoop supergroup 23 2020-05-25 14:46 input/myLocalFile.txt
......
查看HDFS中的myLocalFile.txt这个文件的内容:
hdfs dfs -cat input/myLocalFile.txt
把HDFS中的myLocalFile.txt文件下载到本地文件系统中的“/home/hadoop/download/”这个目录下,命令如下:
mkdir /home/hadoop/download
hdfs dfs -get input/myLocalFile.txt /home/hadoop/download
到本地文件系统查看下载下来的文件myLocalFile.txt:
cd /home/hadoop/download
ls
cat myLocalFile.txt
了解
把文件从HDFS中的一个目录拷贝到HDFS中的另外一个目录。
比如,如果要把HDFS的“/user/hadoop/input/myLocalFile.txt”文件,拷贝到HDFS的另外一个目录“/input”中(注意,这个input目录位于HDFS根目录下),可以使用如下命令:
hdfs dfs -cp input/myLocalFile.txt /input
可以使用rm命令删除一个目录,比如,可以使用如下命令删除刚才在HDFS中创建的“/input”目录(不是“/user/hadoop/input”目录):
hdfs dfs -rm -r /input
二、利用Web界面管理HDFS
可以访问 Web 界面 http://NameNodeIp
:9870 即可看到HDFS的web管理界面。比如我这里http://dxystudy.cn:9870 或者 http://47.115.49.250:9870。
三、利用Java API与HDFS进行交互
Hadoop不同的文件系统之间通过调用Java API进行交互,上面介绍的Shell命令,本质上就是Java API的应用。
Hadoop官方的Hadoop API文档:Hadoop API文档
在林子雨老师的教程中是使用虚拟机里的ecplise,显然我们用云服务器的话是不可行的,可以本地编程然后上传运行。
1. IDEA创建项目
可以导入hadoop目录下的依赖包,但是这样依赖管理很不方便,这里使用IDEA的创建的MAVEN项目,关于创建IDEA的Maven工程项目,比较简单,工程名可以自定义,这里取hadoopdemo,具体步骤这里不一步一步演示了。
2. 添加依赖
这里需要用到基础依赖hadoop-core
和hadoop-common
,因为还需要读写HDFS,所以还需要依赖hadoop-hdfs
和hadoop-client
。
在项目的pom.xml文件中加入以下依赖项:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
3. 编写、运行Java应用程序
(1)编写应用程序
1.1 词频统计
① 编写应用程序
在src
->main
->java
下新建一个WordCount
类,添加内容
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, ONE);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这个是官方的WordCount示例程序,不需要安装任何模式的Hadoop。
② 配置输入文件
WordCount对输入文件字符进行计数,输出计数的结果。首先需要配置输入路径,这里在WordCount
下(src
同级目录)新建一个文件夹input
,并添加一个或多个文本文件到input
中,作为示例。
这里还有一件事情,点击File
->Project Structure
,在弹出来的对话框中选择Modules
项,点击Sources
选项卡,将Language level
调整为8。可以在这里将input
文件夹标记为Excluded
。
③ 配置运行参数
这里需要配置此程序运行时的Main class,以及WordCount需要的输入输出路径。
在Intellij菜单栏中选择Run
->Edit Configurations
,在弹出来的对话框中点击+
,新建一个Application
配置。配置Name
和Main class
为WordCount
(可以点击右边的...
选择),Program arguments
为input/ output/
,即输入路径为刚才创建的input
文件夹,输出为output
。确认后点击ok没成配置。
④ 运行和调试
上述配置完成后,点击菜单栏Run
->Run 'WordCount'
即开始运行此MapReduce程序,Intellij下方会显示Hadoop的运行输出。待程序运行完毕后,Intellij左上方会出现新的文件夹output
,其中的part-r-00000
就是运行的结果了!
报错信息①:
IDEA Error:java: Compilation failed: internal java compiler error
解决办法很简单:File–>Setting…–>Build,Execution,Deployment–>Compiler–>Java Compiler 设置相应Module的target bytecode version的合适版本(跟你jkd版本一致),这里我改成1.8版本的。
报错信息②:
Failed to set permissions of path
这是因为当前用户没有权限来设置路径权限(Linux无此问题),一个解决方法是给hadoop打补丁,参考Failed to set permissions of path: tmp,因为这里使用的Maven,此方法不太适合。另一个方法是将当前用户设置为超级管理员(“计算机管理”,“本地用户和组”中设置),或以超级管理员登录运行此程序。
最好的解决方法是在Linux或macOS上跑hadoop。
事实上我也因为在windows运行出现太多错误了,所以还是打包到ubuntu上去的hadoop来运行。下面会讲。
1.2 文件合并
① 编写应用程序
在src
->main
->java
下新建一个MergeFile
类,添加内容
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
/**
* 过滤掉文件名满足特定条件的文件
*/
class MyPathFilter implements PathFilter {
String reg;
MyPathFilter(String reg) {
this.reg = reg;
}
public boolean accept(Path path) {
return !(path.toString().matches(reg));
}
}
/***
* 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
*/
public class MergeFile {
/**
* 待合并的文件所在的目录的路径
*/
Path inputPath;
/**
* 输出文件的路径
*/
Path outputPath;
public MergeFile(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
//下面过滤掉输入目录中后缀为.abc的文件
FileStatus[] sourceStatus = fsSource.listStatus(inputPath,
new MyPathFilter(".*\\.abc"));
FSDataOutputStream fsdos = fsDst.create(outputPath);
PrintStream ps = new PrintStream(System.out);
//下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
for (FileStatus sta : sourceStatus) {
//下面打印后缀不为.abc的文件的路径、文件大小
System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen()
+ " 权限:" + sta.getPermission() + " 内容:");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read;
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0, read);
}
fsdis.close();
}
ps.close();
fsdos.close();
}
public static void main(String[] args) throws IOException {
// MergeFile merge = new MergeFile(
// "hdfs://localhost:9000/user/hadoop/",
// "hdfs://localhost:9000/user/hadoop/merge.txt");
MergeFile merge = new MergeFile(args[0], args[1]);
merge.doMerge();
}
}
(2) 部署运行
在部署运行前确定Hadoop已经运行了。如果没有,则需要启动,命令如下
start-dfs.sh
下面介绍如何把Java应用程序生成JAR包,部署到Hadoop平台上运行。
首先,在Hadoop安装目录下新建一个名称为myapp的目录,用来存放编写的Hadoop应用程序,可以在Linux的终端中执行如下命令:
cd $HADOOP_HOME
mkdir myapp
将Maven工程编译打包好的jar包文件上传到该目录上。查看一下
cd /usr/local/hadoop/myapp
ls
-
运行词频统计程序
如果做过上一节的实例并且没有清除掉input里的文件,可以运行下面代码查看结果:
hadoop jar hadoopdemo-1.0-SNAPSHOT.jar WordCount input output hdfs dfs -ls output hdfs dfs -cat output/part-r-00000
-
运行文件合并程序
运行文件合并程序
在一个目录下,这里用/home/hadoop/MergeFileTest目录
cd /home/hadoop mkdir MergeFileTest
使用vim新建文件file1.txt、file2.txt、file3.txt、file4.abc和file5.abc,每个文件里面有内容。这里,假设文件内容如下:
file1.txt的内容是: this is file1.txt
file2.txt的内容是: this is file2.txt
file3.txt的内容是: this is file3.txt
file4.abc的内容是: this is file4.abc
file5.abc的内容是: this is file5.abc将这些文件提交到HDFS上
hdfs dfs -mkdir input-MegeFile hdfs dfs -put /home/hadoop/MergeFileTest/* input-MegeFile
运行程序
hadoop jar hadoopdemo-1.0-SNAPSHOT.jar MergeFile input-MegeFile merge.txt
上面程序执行结束以后,可以到HDFS中查看生成的merge.txt文件,比如,可以在Linux终端中执行如下命令:
hdfs dfs -cat /user/hadoop/merge.txt
可以看到如下结果:
this is file1.txt this is file2.txt this is file3.txt
在HDFS的“/user/hadoop”目录下新建
由于Hadoop的设定,下次运行时务必删除
output
文件夹!清除之前运行后的结果记录
hdfs dfs -rm -r output
四、其他
-
分布式文件系统设计的需求
-
分布式文件系统如何实现较高水平扩展的
-
HDFS中的块和普通文件系统中的块的区别
-
HDFS中的名称节点和数据节点的具体功能
-
在分布式文件系统中,中心节点的设计至关重要,HDFS是如何减轻中心节点的负担的。
-
HDFS只设置唯一一个名称结点,在简化系统设计的同时也带来了一些明显的局限性,局限性具体表现在哪些方面?
-
HDFS冗余数据保存策略
-
数据复制主要是在数据写入和数据恢复的时候发生,HDFS数据复制是使用流水线复制的策略。该策略的细节?
-
HDFS是如何探测错误发生以及如何进行恢复的。
-
HDFS在不发生故障的情况下读、写文件的过程。
参考资料
[1] 林子雨. HDFS编程实践(Hadoop3.1.3)
[2] Hadoop: Intellij结合Maven本地运行和调试MapReduce程序 (无需搭载Hadoop和HDFS环境)
上一篇: python写入csv文件的中文乱码问题
下一篇: HDFS块丢失