欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

EC Write

程序员文章站 2022-04-10 23:53:43
...

普通文件流使用的是DFSOutputStream,EC文件使用的流式DFSStripedOutputStream

public class DFSStripedOutputStream extends DFSOutputStream
    implements StreamCapabilities {...}

FSOutputSummer构造器中加断点,得到调用栈:

"aaa@qq.com" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.hadoop.fs.FSOutputSummer.<init>(FSOutputSummer.java:53)
	  at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:191)
	  at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:247)
	  at org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
	  at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:310)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1195)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1133)
	  at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:530)
	  at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:527)
	  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:541)
	  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:468)
	  at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:193)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1194)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1174)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1063)
	  at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.create(CommandWithDestination.java:509)
	  at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:484)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
	  at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
	  at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
	  at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
	  at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
	  at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
	  at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
	  at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
	  at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
	  at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
	  at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:132)

可以看到在DFSOutputStream中的create方法中在创建输出流的时候,对类型做了判断。

public DFSOutputStream create(String src, FsPermission permission,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, Progressable progress, int buffersize,
      ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
      String ecPolicyName) throws IOException {
    checkOpen();
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes), ecPolicyName);
    beginFileLease(result.getFileId(), result);
    return result;
  }

继续看newStreamForCreate:

      final DFSOutputStream out;
      if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      out.start();

1.org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)

对于createFileappendFile,相对三副本,代码如下:

  // 创建一个新文件
  /** Construct a new output stream for creating a file. */
  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                         EnumSet<CreateFlag> flag, Progressable progress,
                         DataChecksum checksum, String[] favoredNodes)
                         throws IOException {
    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Creating DFSStripedOutputStream for " + src);
    }
    // 在创建一个新的文件的stream时,相比三副本,多了一些参数。
    ecPolicy = stat.getErasureCodingPolicy();
    final int numParityBlocks = ecPolicy.getNumParityUnits();
    cellSize = ecPolicy.getCellSize();
    numDataBlocks = ecPolicy.getNumDataUnits();
    numAllBlocks = numDataBlocks + numParityBlocks;
    this.favoredNodes = favoredNodes;
    failedStreamers = new ArrayList<>();
    corruptBlockCountMap = new LinkedHashMap<>();
    flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks);
    flushAllExecutorCompletionService = new
        ExecutorCompletionService<>(flushAllExecutor);

    ErasureCoderOptions coderOptions = new ErasureCoderOptions(
        numDataBlocks, numParityBlocks);
    encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(),
        ecPolicy.getCodecName(), coderOptions);

    coordinator = new Coordinator(numAllBlocks);
    cellBuffers = new CellBuffers(numParityBlocks);

    streamers = new ArrayList<>(numAllBlocks);
    for (short i = 0; i < numAllBlocks; i++) {
      StripedDataStreamer streamer = new StripedDataStreamer(stat,
          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
          favoredNodes, i, coordinator, getAddBlockFlags());
      streamers.add(streamer);
    }
    currentPackets = new DFSPacket[streamers.size()];
    setCurrentStreamer(0);
  }

  // 追加写
  /** Construct a new output stream for appending to a file. */
  DFSStripedOutputStream(DFSClient dfsClient, String src,
      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
      throws IOException {
    this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
    initialFileSize = stat.getLen(); // length of file when opened
    prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
  }

我们看一下这些字段的作用:
1.ecPolicy :ErasureCodingPolicy ecPolicy = null; 默认为null,不使用ec,就是三副本。
2.cellSize:每个cell的大小,必须为1024整数倍。

Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
    Preconditions.checkArgument(cellSize % 1024 == 0,
        "cellSize must be 1024 aligned");

3.encoder:编码工具。
4.streamers:在ec中,streamers是一个list,即多个streamer。个数同数据块的个数。

    streamers = new ArrayList<>(numAllBlocks);
    for (short i = 0; i < numAllBlocks; i++) {
      StripedDataStreamer streamer = new StripedDataStreamer(stat,
          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
          favoredNodes, i, coordinator, getAddBlockFlags());
      streamers.add(streamer);
    }

5.currentPackets:currentPackets = new DFSPacket[streamers.size()];,每个streamer都有自己的packet。


EC Write
三个内部类承担的角色如下:
EC Write

  • 数据块记为:Data
  • 校验块记为:Parity

链接:HDFS EC:将纠删码技术融入HDFS

  • 对HDFS的一个普通文件来说,构成它的基本单位是块。
  • 对于EC模式下的文件,构成它的基本单位为块组。
  • 在EC模式下,构成文件的基本单位为块组,因此首先需要考虑的是如何在NameNode里保存每个文件的块组信息。一种比较直接的方法是给每个块组分配一个块ID,同时用一个Map来记录这个ID与块组信息的映射,每个块组信息包含了每个内部块的信息。对小文件来说,这种做法将增加其在NameNode中的内存消耗。以RS(6,3)为例,如果一个文件比6个块略小些,那么NameNode必须为它维护10个ID(1个块组ID、6个数据块ID和3个校验块ID)。在小文件数目占优的情况下,NameNode的内存使用将面临考验。
相关标签: HDFS