欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HBase splitlog 过程

程序员文章站 2022-05-30 14:05:50
...

上一篇Blog提到了HBase在regionserver挂掉以后,master会处理,其中很重要的一步是就是splitlog,把.logs目录下的该rs的文件夹里的HLog文件,按照region进行分配。splitlog的代码如下所示:

private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
    List<Path> processedLogs = new ArrayList<Path>();//成功处理以后的文件放入这个目录下
    List<Path> corruptedLogs = new ArrayList<Path>();//读取文件出错的放入这个目录下
    List<Path> splits = null;

    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);

    long totalBytesToSplit = countTotalBytes(logfiles);
    splitSize = 0;

    outputSink.startWriterThreads(entryBuffers);//启动三个写线程,将内存中的数据按照region分别写入region下的recover.edits目录下

    try {
      int i = 0;
      for (FileStatus log : logfiles) {
       Path logPath = log.getPath();
        long logLength = log.getLen();
        splitSize += logLength;
        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
            + ": " + logPath + ", length=" + logLength);
        Reader in;
        try {
          in = getReader(fs, log, conf, skipErrors);
          if (in != null) {
            parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);//读取文件写入内存entryBuffers
            try {
              in.close();
            } catch (IOException e) {
              LOG.warn("Close log reader threw exception -- continuing",
                  e);
            }
          }
          processedLogs.add(logPath);
        } catch (CorruptedLogFileException e) {
          LOG.info("Got while parsing hlog " + logPath +
              ". Marking as corrupted", e);
          corruptedLogs.add(logPath);
          continue;
        }
      }
      if (fs.listStatus(srcDir).length > processedLogs.size()
          + corruptedLogs.size()) {
        throw new OrphanHLogAfterSplitException(
            "Discovered orphan hlog after split. Maybe the "
            + "HRegionServer was not dead when we started");
      }
      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);//把corrutedLogs里的path放入到.corruptedlogs上,把processedLogs上的path移到oldlog上,并删除HLog
    } finally {
      LOG.info("Finishing writing output logs and closing down.");
      splits = outputSink.finishWritingAndClose();
    }
    return splits;
  }

 ParseHLog过程很简单从文件中读取数据写入到内存中,一次最多128M

  private void parseHLog(final Reader in, Path path,
		EntryBuffers entryBuffers, final FileSystem fs,
    final Configuration conf, boolean skipErrors)
	throws IOException, CorruptedLogFileException {
    int editsCount = 0;
    try {
      Entry entry;
      while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
        entryBuffers.appendEntry(entry);
        editsCount++;
      }
    } catch (InterruptedException ie) {
      IOException t = new InterruptedIOException();
      t.initCause(ie);
      throw t;
    } finally {
      LOG.debug("Pushed=" + editsCount + " entries from " + path);
    }
  }

 写线程也比较简单,每个线程从entryBuffer中获取一个region的一块数据,在一个entrBuffer中,一个region只能由一个线程来handler,不然会有多个写线程同时对一个文件进行操作。

 private void doRun() throws IOException {
      LOG.debug("Writer thread " + this + ": starting");
      while (true) {
        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
        if (buffer == null) {
          // No data currently available, wait on some more to show up
          synchronized (dataAvailable) {
            if (shouldStop) return;
            try {
              dataAvailable.wait(1000);
            } catch (InterruptedException ie) {
              if (!shouldStop) {
                throw new RuntimeException(ie);
              }
            }
          }
          continue;
        }

        assert buffer != null;
        try {
          writeBuffer(buffer);
        } finally {
          entryBuffers.doneWriting(buffer);
        }
      }
    }

 

 

相关标签: Hbase