欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop: HDFS数据流分析

程序员文章站 2022-03-23 08:33:47
...

简介

本文主要介绍客户端及与之交互的HDFS、NameNode和DataNode之间的数据流的工作机制。

NameNode和DataNode介绍

在了解数据流工作机制之前,我们先来了解一下NameNode和DataNode。

Hadoop: HDFS数据流分析

HDFS集群有两类节点以管理者-工作者模式运行,即一个NameNode和多个DataNode。NameNode管理文件系统的命名空间,维护文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。NameNode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时由数据节点重建。
DataNode是文件系统的工作节点。它们根据需要存储并检索数据块(受客户端或NameNode调度),并且定期向NameNode发送它们所存储的块的列表。
客户端(client)代表用户通过与NnameNode和DataNode交互来访问整个文件系统。客户端提供一个类似于POSIX的文件系统接口,因此用户在编程时无需知道NameNode和DataNode也可以实现其功能。

剖析文件读取

下面的例子是将HDFS文件系统中的文本输出到标准输出上。

// cc FileSystemCat Displays files from a Hadoop filesystem on standard output by using the FileSystem directly
import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

// vv FileSystemCat
public class FileSystemCat {

  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
// ^^ FileSystemCat
执行hadoop FileSystemCat hdfs://localhost:9000/user/hadoop/sample.txt命令可以将sample.txt输出到标准输出上。接下来我们分析文件读取的数据流程。

Hadoop: HDFS数据流分析

客户端通过调用FileSystem对象的open()方法来打开希望读取的文件,对于HDFS来说,这个对象是分布式文件系统(图1的步骤1)的一个实例。DistributedFileSystem通过使用RPC来调用NameNode,以确定文件的实际位置(步骤2)。对于每一个块,NameNode返回存有该块副本的DataNode地址。此外,这些DataNode根据它们与客户端的距离来排序。如果该客户端本身就是一个DataNode(如在一个MapReduce任务中),并保存有相应的数据库的一个副本时,该节点就会从本地DataNode读取数据。
DistributedFileSystem类返回一个FSDataInputStream对象(一个支持文件定位的输入流)给客户端并读取数据。FSDataInputStream类转而封装DFSInputStream对象,该对象管理着DataNode和NameNode的I/O.
接着,客户端对这个输入流调用read()方法(步骤3)。存储着文件起始几个块的DataNode地址的DFSInputStream随即连接距离最近的DataNode。通过对数据流反复调用read()方法,可以将数据从DataNode传输到客户端(步骤4)。到达块的末端时,DFSInputStream关闭与该DataNode的连接,然后选择下一个块的最佳DataNode(步骤5)。客户端只需要读取连续的流,并且对客户端都是透明的。
客户端从流中读取数据时,块是按照打开DFSInputStream与DataNode新建连接的顺序读取的。它也会根据需要询问NameNode来检索下一批数据库的DataNode的位置。一旦客户端完成读取,就对FSDataInputStream调用close()方法(步骤6)。
在读取数据流的时候,如果DFSInputStream在与DataNode通信时遇到错误,会尝试从这个块的另外一个最邻近的DataNode读取数据。它也记住那个故障的DataNode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会校验和确定从DataNode发来数据的完整性。如果发现有损坏的块,就在DFSInputStream试图从其它DataNode读取其副本之前通知NameNode。
这个设计的一个重点是,NameNode告知客户端每个块中最佳的DataNode,并让客户端直接连接到该DataNode检索数据。由于数据流分散在集群中所有的DataNode,所以这种设计能使HDFS可扩展到大量的并发客户端。同时,NameNode只需要响应块位置的请求(这些信息保存在内存中,非常高效),无需响应数据请求,否则,随着客户端数量的增长,NameNode很快成为瓶颈。

剖析文件的写入

下面的例子将输出字符串"content“到HDFS指定文件中。

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class URLWrite {
	public static void main(String[] args) {
		String uri = args[0];
		Configuration conf = new Configuration();
	    try {
			FileSystem fs = FileSystem.get(URI.create(uri), conf);
			Path p = new Path(uri);
			OutputStream out = fs.create(p);
			out.write("content".getBytes("UTF-8"));
			out.flush();
			out.close();			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
执行hadoop URLWrite hdfs://localhost:9000/user/hadoop/sample.txt命令可以将字符串写入sample.txt。接下来我们分析文件写入的数据流程。

Hadoop: HDFS数据流分析

客户端通过对DistributedFileSystem对象调用create()函数来新建文件(图2 步骤1)。DistributedFileSysted对NameNode创建一个RPC调用,在文件系统的命名空间中新建一个文件,此时该文件中还没有相应的数据块(步骤2)。NameNode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限。如果这些检查通过,NameNode就会为创建新文件记录一条记录;否则,文件创建失败并向客户端抛出一个IOException异常。DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。就像读取操作一样,FSDataOutputStream封装一个DFSOutputStream对象,该对象负责处理DataNode和NameNode之间的通信。
在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的数据包,并写入内部队列,并称为"数据队列"(data queue)。 DataStreamer处理数据队列,它的责任是根据DataNode列表来要求NameNode分配适合的新块来存储数据副本。这一组DataNode构成一个管线---假设复本数为3,所以管线中有3个节点。DataStreamer将数据包流式传输到管线中第1个DataNode,该DataNode存储数据包并将它发送到管线中的第2个DataNode。同样,第2个DataNode存储该数据包并发送给管线中的第3个(也是最后一个)DataNode(步骤4)。
DFSOutputStream也维护一个内部数据包队列来等待DataNode的收到确认回执,称为"确认队列"(ack queque)。收到管道中所有DataNode确认信息后,该数据包才会从确认队列中删除(步骤5)。
如果在数据写入期间DataNode发生故障,则执行以下操作(对写入数据的客户端是透明的)。首先关闭管线,确认把队列中的所有数据包都添加回数据队列的前端,以确保故障节点下游的DataNode不会漏掉任何一个数据包。为存储在另外一个正常DataNode的当前数据块指定一个新的标识,并将该标识传送给NameNode,以便故障DataNode在恢复后可以删除存储的部分数据块。从管线中删除数据节点并将剩下的数据块写入管线中另外两个正常的DataNode中。NameNode注意到块副本数量不足时,会在另外一个节点上创建一个新的副本。后续的数据块继续正常接受处理。
在一个块被写入期间可能会有多个DataNode同时发生故障,但非常少见。只要写入dfs.replication.min的副本数(默认为1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标副本数(dfs.replication的默认值为3)。
客户端完成数据的写入后,对数据流调用close()方法(步骤6)。该操作将剩余的所有数据包写入DataNode管线,并在联系到NameNode且发送文件写入完成信号之前,等待确认(步骤7)。NameNode已经知道文件由哪些块组成(通过DataStreamer请求分配数据块),所以它在返回成功前只需要等待数据块进行最小量的复制。

参考资料

1. Hadoop权威指南 第3版