Hadoop--HDFS
hadoop--hdfs
edits和fsimage机制详解
概述
fsimage镜像文件包含了整个hdfs文件系统的所有目录和文件的indoe(节点)信息,比如:/node01/node,会记录每个节点nodeid,以及节点之间父子路径。
以及文件名,文件大小,文件被切成几块,每个数据块描述信息、修改时间、访问时间等;此外还有对目录的修改时间、访问权限控制信息(目录所属用户,所在组等)等。
另外,edits文件主要是在namenode已经启动情况下对hdfs进行的各种更新操作进行记录,比如 :hadoop fs -mkdir hadoop fs -delete hadoop fs -put等。
对于每次事务操作,都会用一个txid来标识,op_mkdir op_delete等。
edits文件存储的操作,而fsimage文件存储的是执行操作后,变化的状态。(元数据)
hdfs客户端执行所有的写操作都会被记录到edits文件中。
关键点
1.当执行格式化指令时候,会在指定元数据目录生成 dfs/name/current/
最开始只有fsimage,没有edits文件(因为没有启动hdfs)
2.当初次启动hfds,会生成edits_inprogress_0000000000000000001,此文件用于记录事务(写操作)
3.hdfs对于每次写操作,都会用一个事务id(txid)来记录,txid是递增的。
4.edits_0000000000000000003-0000000000000000007,数字表示的合并后起始的事务id和终止事务id
5.seen_txid 存储的当前的事务id,和edits_inprogress最后的数字一致
6.datanode存储块的目录路径:/tmp/dfs/data/current/bp-859711469-192.168.150.137-1535216211704/current/finalized/subdir0/subdir0
7.finalized此目录存储的已经存储完毕的数据块,rbw目录存的是正在写但还未写完的数据块
查看edits文件和fsimage文件
hdfs oev -i edits_0000000000000000001-0000000000000000003 -o edits.xml
hdfs oiv -i fsimage_0000000000000000012 -o fsimage.xml -p xml
hdfs api操作
1.创建java工程
2.导入hadoop依赖jar包
连接namenode以及读取hdfs中指定文件
@test
public void testconnectnamenode() throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.211:9000"), conf);
inputstream in=fs.open(new path("/park/1.txt"));
outputstream out=new fileoutputstream("1.txt");
ioutils.copybytes(in, out, conf);
}
上传文件到hdfs上
xxxxxxxxxx
@test
public void testput() throws exception{
configuration conf=new configuration();
conf.set("dfs.replication","1");
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.21:9000"),conf,"root");
bytearrayinputstream in=new bytearrayinputstream("hello hdfs".getbytes());
outputstream out=fs.create(new path("/park/2.txt"));
ioutils.copybytes(in, out, conf);
}
从hdfs上删除文件
xxxxxxxxxx
@test
public void testdelete()throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.21:9000"),conf,"root");
//true表示无论目录是否为空,都删除掉。可以删除指定的文件
fs.delete(new path("/park01"),true);
//false表示只能删除不为空的目录。
fs.delete(new path("/park01"),false);
fs.close();
}
在hdfs上创建文件夹
xxxxxxxxxx
@test
public void testmkdir()throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.211:9000"),conf,"root");
fs.mkdirs(new path("/park02"));
}
查询hdfs指定目录下的文件
xxxxxxxxxx
@test
public void testls()throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.214:9000"),conf,"root");
remoteiterator<locatedfilestatus> rt=fs.listfiles(new path("/"), true);
while(rt.hasnext()){
system.out.println(rt.next());
}
}
递归查看指定目录下的文件
xxxxxxxxxx
@test
public void testls()throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.214:9000"),conf,"root");
remoteiterator<locatedfilestatus> rt=fs.listfiles(new path("/"), true);
while(rt.hasnext()){
system.out.println(rt.next());
}
}
重命名
xxxxxxxxxx
@test
public void testcreatenewfile() throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.176:9000"),conf,"root");
fs.rename(new path("/park"), new path("/park01"));
}
获取文件的块信息
xxxxxxxxxx
@test
public void testcopyfromloaclfilesystem() throws exception{
configuration conf=new configuration();
filesystem fs=filesystem.get(new uri("hdfs://192.168.234.176:9000"),conf,"root");
blocklocation[] data=fs.getfileblocklocations(new path("/park01/1.txt"),0,integer.maxvalue);
for(blocklocation bl:data){
system.out.println(bl);
}
}
从hdfs下载文件过程
1.client向namenode发起 open file 请求。目的是获取指定文件的输入流。
namenode收到请求之后,会检查路径的合法性,此外,还是检查客户端的操作权限。如果检测未通过,则直接报错返回。后续过程不会发生。
2.client也会向namenode发起:getblockloaction请求,获取指定文件的元数据信息。如果第一步的检测通过,namenode会将元数据信息封装到输入流里,返回给客户端。
3.4 客户端根据元数据信息,直接去对应的datanode读取文件块,然后下载到本地(创建本地的输出流,然后做流的对接)
5.读完后,关流。
上传文件到hdfs
1.client向namenode发现 create file请求,目的是获取hdfs文件的输出流。namenode收到请求后,会检测路径的合法性和权限。如果检测未通过,直接报错返回。
如果通过检测,namenode会将文件的切块信息(比如文件被切成几块,每个文件块的副本存在哪台datanode上),然后把这些信息封装到输出流里,返回给客户端。
所以注意:文件块的输出(上传)是客户端直接和对应dn交互的,namenode的作用是告诉client文件块要发送给哪个datanode上。
2.client通过输出流,发送文件块(底层会将一个文件块打散成一个一个的packet,每个packet的大小=64kb)。这个过程的机制,叫pipeline(数据流管道机制)
这种机制的目的:
为了提高网络效率,我们采取了把数据流和控制流分开的措施。在控制流从客户机到主chunk、然后冉 到所有二级副本的同时,数据以管道的方式,顺序的沿着一个精心选择的chunk服务器链推送。我们的目标 是充分利用每台机器的带宽,避免网络瓶颈和高延时的连接,最小化推送所有数据的延时。 为了充分利用每台机器的带宽,数据沿着一个chunk服务器链顺序的推送,而不是以其它拓扑形式分散 推送(例如,树型拓扑结构)。线性推送模式下,每台机器所有的出凵带宽都用于以最快的速度传输数据,而 不是在多个接受者之间分配带宽。
3.4.5 。通过数据流管道机制,实现数据的发送和副本的复制。每台datanode服务器收到数据之后,会向上游反馈ack确认机制。直到第五步的ack发送给client之后,再发送下一个packet。依次循环,直到所有的数据都复制完毕。此外,在底层传输的过程中,会用到全双工通信。
补充:建议看《google file system》的3.2节
6.数据上传完之后,关流。
从hdfs删除文件的流程
1、客户端向namenode发现 删除文件指令,比如:
xxxxxxxxxx
hadoop fs -rm /park01/1.txt
2、namenode收到请求后,会检查路径的合法性以及权限
3、如果检测通过,会将对应的文件从元数据中删除。(注意,此时这个文件并没有真正从集群上被删除)
4、每台datanode会定期向namenode发送心跳,会领取删除的指令,找到对应的文件块,进行文件块的删除。
hdfs的租约机制
hdfs的有个内部机制:不允许客户端的并行写。指的是同一时刻内,不允许多个客户端向一个hdfs上写数据。
所以要实现以上的机制,实现思路就是用互斥锁,但是如果底层要是用简单的互斥锁,可能有与网络问题,造成客户端不释放锁,而造成死锁。所以hadoop为了避免这种情况产生,引入租约机制。
租约锁本质上就是一个带有租期的互斥锁。
hadoop的思想来自于google的论文,3.1
hadoop 租约锁对应的类:org.apache.hadoop.hdfs.server.namenode.leasemanager.lease
还有一个租约锁管理者:
org.apache.hadoop.hdfs.server.namenode.leasemanager
hdfs特点
1、分布式存储架构,支持海量数据存储。(gb、tb、pb级别数据)
2、高容错性,数据块拥有多个副本(副本冗余机制)。副本丢失后,自动恢复。
3、低成本部署,hadoop可构建在廉价的服务器上。
4、能够检测和快速应对硬件故障,通过rpc心跳机制来实现。
5、简化的一致性模型,这里指的是用户在使用hdfs时,所有关于文件相关的操作,比如文件切块、块的复制、块的存储等细节并不需要去关注,所有的工作都已被框架封装完毕。用户所需要的做的仅仅是将数据上传到hdfs。这大大简化了分布式文件存储操作的难度和管理的复杂度。
6、hdfs不能做到低延迟的数据访问(毫秒级内给出响应)。但是hadoop的优势在于它的高吞吐率(吞吐率指的是:单位时间内产生的数据流)。可以说hdfs的设计是牺牲了低延迟的数据访问,而获取的是数据的高吞吐率。如果要想获取低延迟的数据访问,可以通过hbase框架来实现。
7、hdfs不许修改数据,所以适用场景是:一次写入,多次读取(once-write-many-read)。注意:hdfs允许追加数据,但不允许修改数据。追加和修改的意义是不同的。
8、hdfs不支持并发写入,一个文件同一个时间只能有一个写入者。
9、hdfs不适合存储海量小文件,因为会浪费namenode服务节点的内存空间
上一篇: python 读取目录下csv文件并绘制曲线v111的方法
下一篇: 凭本事减肥
推荐阅读