HBase splitlog 过程
程序员文章站
2022-05-30 14:05:50
...
上一篇Blog提到了HBase在regionserver挂掉以后,master会处理,其中很重要的一步是就是splitlog,把.logs目录下的该rs的文件夹里的HLog文件,按照region进行分配。splitlog的代码如下所示:
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();//成功处理以后的文件放入这个目录下
List<Path> corruptedLogs = new ArrayList<Path>();//读取文件出错的放入这个目录下
List<Path> splits = null;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
long totalBytesToSplit = countTotalBytes(logfiles);
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);//启动三个写线程,将内存中的数据按照region分别写入region下的recover.edits目录下
try {
int i = 0;
for (FileStatus log : logfiles) {
Path logPath = log.getPath();
long logLength = log.getLen();
splitSize += logLength;
LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
Reader in;
try {
in = getReader(fs, log, conf, skipErrors);
if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);//读取文件写入内存entryBuffers
try {
in.close();
} catch (IOException e) {
LOG.warn("Close log reader threw exception -- continuing",
e);
}
}
processedLogs.add(logPath);
} catch (CorruptedLogFileException e) {
LOG.info("Got while parsing hlog " + logPath +
". Marking as corrupted", e);
corruptedLogs.add(logPath);
continue;
}
}
if (fs.listStatus(srcDir).length > processedLogs.size()
+ corruptedLogs.size()) {
throw new OrphanHLogAfterSplitException(
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);//把corrutedLogs里的path放入到.corruptedlogs上,把processedLogs上的path移到oldlog上,并删除HLog
} finally {
LOG.info("Finishing writing output logs and closing down.");
splits = outputSink.finishWritingAndClose();
}
return splits;
}
ParseHLog过程很简单从文件中读取数据写入到内存中,一次最多128M
private void parseHLog(final Reader in, Path path,
EntryBuffers entryBuffers, final FileSystem fs,
final Configuration conf, boolean skipErrors)
throws IOException, CorruptedLogFileException {
int editsCount = 0;
try {
Entry entry;
while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
entryBuffers.appendEntry(entry);
editsCount++;
}
} catch (InterruptedException ie) {
IOException t = new InterruptedIOException();
t.initCause(ie);
throw t;
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
}
}
写线程也比较简单,每个线程从entryBuffer中获取一个region的一块数据,在一个entrBuffer中,一个region只能由一个线程来handler,不然会有多个写线程同时对一个文件进行操作。
private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting");
while (true) {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (dataAvailable) {
if (shouldStop) return;
try {
dataAvailable.wait(1000);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
}
}
}
continue;
}
assert buffer != null;
try {
writeBuffer(buffer);
} finally {
entryBuffers.doneWriting(buffer);
}
}
}
上一篇: JSTL 获取当前时间
推荐阅读