EC Write
程序员文章站
2022-04-10 23:53:43
...
普通文件流使用的是DFSOutputStream
,EC文件使用的流式DFSStripedOutputStream
:
public class DFSStripedOutputStream extends DFSOutputStream
implements StreamCapabilities {...}
在FSOutputSummer
构造器中加断点,得到调用栈:
"aaa@qq.com" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.fs.FSOutputSummer.<init>(FSOutputSummer.java:53)
at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:191)
at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:247)
at org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:310)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1195)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1133)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:530)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:527)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:541)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:468)
at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:193)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1194)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1174)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1063)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.create(CommandWithDestination.java:509)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:484)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:132)
可以看到在DFSOutputStream中的create方法中在创建输出流的时候,对类型做了判断。
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result);
return result;
}
继续看newStreamForCreate
:
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
out.start();
1.org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
对于createFile
和appendFile
,相对三副本,代码如下:
// 创建一个新文件
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
// 在创建一个新的文件的stream时,相比三副本,多了一些参数。
ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
this.favoredNodes = favoredNodes;
failedStreamers = new ArrayList<>();
corruptBlockCountMap = new LinkedHashMap<>();
flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks);
flushAllExecutorCompletionService = new
ExecutorCompletionService<>(flushAllExecutor);
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataBlocks, numParityBlocks);
encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
coordinator = new Coordinator(numAllBlocks);
cellBuffers = new CellBuffers(numParityBlocks);
streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
// 追加写
/** Construct a new output stream for appending to a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
initialFileSize = stat.getLen(); // length of file when opened
prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
}
我们看一下这些字段的作用:
1.ecPolicy
:ErasureCodingPolicy ecPolicy = null; 默认为null,不使用ec,就是三副本。
2.cellSize
:每个cell的大小,必须为1024整数倍。
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
Preconditions.checkArgument(cellSize % 1024 == 0,
"cellSize must be 1024 aligned");
3.encoder
:编码工具。
4.streamers
:在ec中,streamers
是一个list,即多个streamer。个数同数据块的个数。
streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
5.currentPackets
:currentPackets = new DFSPacket[streamers.size()];,每个streamer都有自己的packet。
三个内部类承担的角色如下:
- 数据块记为:Data
- 校验块记为:Parity
链接:HDFS EC:将纠删码技术融入HDFS
- 对HDFS的一个普通文件来说,构成它的基本单位是块。
- 对于EC模式下的文件,构成它的基本单位为块组。
- 在EC模式下,构成文件的基本单位为块组,因此首先需要考虑的是如何在NameNode里保存每个文件的块组信息。一种比较直接的方法是给每个块组分配一个块ID,同时用一个Map来记录这个ID与块组信息的映射,每个块组信息包含了每个内部块的信息。对小文件来说,这种做法将增加其在NameNode中的内存消耗。以RS(6,3)为例,如果一个文件比6个块略小些,那么NameNode必须为它维护10个ID(1个块组ID、6个数据块ID和3个校验块ID)。在小文件数目占优的情况下,NameNode的内存使用将面临考验。
上一篇: SET EC调试版
推荐阅读
-
python文件写入write()的操作
-
python write无法写入文件的解决方法
-
解决Linux下ssh登录后出现 报错 Write failed: Broken pipe 的方法
-
JavaScript 输出显示内容(document.write、alert、innerHTML、console.log)
-
RESPONSE.WRITE和<%=%的区别
-
PHP 之 写时复制介绍(Copy On Write)
-
解决MYSQL出现Can''t create/write to file ''#sql_5c0_0.MYD''的问题
-
Python实现备份EC2的重要文件和MySQL数据库到S3
-
php提示Failed to write session data错误的解决方法
-
请问在mssql“SQL事件探查器”里表格的标题,如CPU,Read,Write,Duration,SPID.........的解释