hbase put源码分析
这里写下HRegionServer在做put操作的源码:
HRegionServer
public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
throws ServiceException {
....
try {
region = getRegion(regionAction.getRegion());//获得对应操作的Region
} catch (IOException e) {
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
continue; // For this region it's a failure.
}
....
if (regionAction.hasAtomic() && regionAction.getAtomic()) {//是否需要原子操作
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
try {
mutateRows(region, regionAction.getActionList(), cellScanner);//以下详细介绍
} catch (IOException e) {
// As it's atomic, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
} else {
// doNonAtomicRegionMutation manages the exception internally
cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn, nonceGroup);
}
....
}
HRegionServer的mutateRows方法:
protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner)
throws IOException {
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
RowMutations rm = null;
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
region.mutateRow(rm);//HRegion执行操作
}
调用HRegion
public void mutateRow(RowMutations rm) throws IOException {
// Don't need nonces here - RowMutations only supports puts and deletes
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
}
HRegion的mutateRow详细过程:
1.执行操作前的coprocessorHost的操作
MultiRowMutationProcessor类
2.获取rowlock(rowid)
3.尝试获取HRegion(updatesLock) read lock
4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象
MultiRowMutationProcessor类
5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中
6.添加到对应HStore里的memstore中,(需要store的读锁)
7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)
FSHlog
8.释放HRegion(updatesLock) read lock
9.释放rowlock(rowid) latchdown--
10.同步这个hregionserver上的hlog到当前的hlog,
使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid
FSHlog类
11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
12.MultiRowMutationProcessor执行postProcess
执行协同类
以下是带源码的分析过程:
1.执行操作前的coprocessorHost的操作
MultiRowMutationProcessor类
2.获取rowlock(rowid)
/**
* Tries to acquire a lock on the given row.
* @param waitForLock if true, will block until the lock is available.
* Otherwise, just tries to obtain the lock and returns
* false if unavailable.
* @return the row lock if acquired,
* null if waitForLock was false and the lock was not acquired
* @throws IOException if waitForLock was true and the lock could not be acquired after waiting
*/
public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
checkRow(row, "row lock");
startRegionOperation();
try {
HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {//循环获取此rowid
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);//尝试获取rowid
if (existingContext == null) {//没有其他获得此rowid
// Row is not already locked by any thread, use newly created context.
break;
} else if (existingContext.ownedByCurrentThread()) {//当前进行已经获取此rowid
// Row is already locked by current thread, reuse existing context instead.
rowLockContext = existingContext;
break;
} else {//其他进行获得rowid
// Row is already locked by some other thread, give up or wait for it
if (!waitForLock) {
return null;
}
try {
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {//使用CountDownLatch等待其他进行释放此rowid
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
// allocate new lock for this thread
return rowLockContext.newLock();
} finally {
closeRegionOperation();
}
}
3.尝试获取HRegion(updatesLock) read lock
4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象
MultiRowMutationProcessor类
@Override
public void process(long now,
HRegion region,
List<KeyValue> mutationKvs,
WALEdit walEdit) throws IOException {
byte[] byteNow = Bytes.toBytes(now);
// Check mutations and apply edits to a single WALEdit
for (Mutation m : mutations) {//对每个row进行检查,更新timestamp
if (m instanceof Put) {
Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
region.checkFamilies(familyMap.keySet());
region.checkTimestamps(familyMap, now);
region.updateKVTimestamps(familyMap.values(), byteNow);
} else if (m instanceof Delete) {
Delete d = (Delete) m;
region.prepareDelete(d);
region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow);
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
for (List<Cell> cells: m.getFamilyCellMap().values()) {
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
for (Cell cell : cells) {//对每个cf的row,写入walEdit对象
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
mutationKvs.add(kv);
if (writeToWAL) {
walEdit.add(kv);
}
}
}
}
}
5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中
writeEntry = mvcc.beginMemstoreInsert();
/**
* Generate and return a {@link WriteEntry} with a new write number.
* To complete the WriteEntry and wait for it to be visible,
* call {@link #completeMemstoreInsert(WriteEntry)}.
*/
public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {
long nextWriteNumber = ++memstoreWrite;//使用当前HRegion的mvcc的writepoint++作为logedit的number
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
6.添加到对应HStore里的memstore中,(需要store的读锁)
@Override
public long add(final KeyValue kv) {
lock.readLock().lock();
try {
return this.memstore.add(kv);
} finally {
lock.readLock().unlock();
}
}
7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)
FSHlog
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id.
*
* Later, if we sort by these keys, we obtain all the relevant edits for a
* given key-range of the HRegion (TODO). Any edits that do not have a
* matching COMPLETE_CACHEFLUSH message can be discarded.
*
* <p>
* Logs cannot be restarted once closed, or once the HLog process dies. Each
* time the HLog starts, it must create a new log. This means that other
* systems should process the log appropriately upon each startup (and prior
* to initializing HLog).
*
* synchronized prevents appends during the completion of a cache flush or for
* the duration of a log roll.
*
* @param info
* @param tableName
* @param edits
* @param clusterIds that have consumed the change (for replication)
* @param now
* @param doSync shall we sync?
* @param sequenceId of the region.
* @return txid of this transaction
* @throws IOException
*/
@SuppressWarnings("deprecation")
private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
TraceScope traceScope = Trace.startSpan("FSHlog.append");
try {
long txid = 0;
synchronized (this.updateLock) {//同步hlog的更新锁
// get the sequence number from the passed Long. In normal flow, it is coming from the
// region.
long seqNum = sequenceId.incrementAndGet();
//生成region的seqnum,用于当memstore flush了,则mem的seqnum>=hlog的seqnum,则删除这些hlog
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
// Use encoded name. Its shorter, guaranteed unique and a subset of
// actual name.
byte [] encodedRegionName = info.getEncodedNameAsBytes();//regioncode
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);//没有这个region,则添加(存储着这个region最早的修改seqnum)
HLogKey logKey = makeKey(
encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
synchronized (pendingWritesLock) {
doWrite(info, logKey, edits, htd);//将logedit写入队列
txid = this.unflushedEntries.incrementAndGet();
}
this.numEntries.incrementAndGet();
this.asyncWriter.setPendingTxid(txid);
if (htd.isDeferredLogFlush()) {
lastUnSyncedTxid = txid;
}
this.latestSequenceNums.put(encodedRegionName, seqNum);
}
// TODO: note that only tests currently call append w/sync.
// Therefore, this code here is not actually used by anything.
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
if (doSync &&
(info.isMetaRegion() ||
!htd.isDeferredLogFlush())) {
// sync txn to file system
this.sync(txid);
}
return txid;
} finally {
traceScope.close();
}
}
8.释放HRegion(updatesLock) read lock
9.释放rowlock(rowid) latchdown--
10.同步这个hregionserver上的hlog到当前的hlog,
使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid
FSHlog类
// sync all transactions upto the specified txid
private void syncer(long txid) throws IOException {
synchronized (this.syncedTillHere) {
while (this.syncedTillHere.get() < txid) {
try {
this.syncedTillHere.wait();
if (txid <= this.failedTxid.get()) {
assert asyncIOE != null :
"current txid is among(under) failed txids, but asyncIOE is null!";
throw asyncIOE;
}
} catch (InterruptedException e) {
LOG.debug("interrupted while waiting for notification from AsyncNotifier");
}
}
}
}
11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
/**
* Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
*
* At the end of this call, the global read point is at least as large as the write point
* of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
*/
public void completeMemstoreInsert(WriteEntry e) {
advanceMemstore(e);
waitForRead(e);
}
/**
* Mark the {@link WriteEntry} as complete and advance the read point as
* much as possible.
*
* How much is the read point advanced?
* Let S be the set of all write numbers that are completed and where all previous write numbers
* are also completed. Then, the read point is advanced to the supremum of S.
*
* @param e
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
boolean advanceMemstore(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted();
long nextReadValue = -1;
boolean ranOnce=false;
while (!writeQueue.isEmpty()) {//将readpoint设置到最大的完成wal的writepoint值
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("never was a first");
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
memstoreRead = nextReadValue;
readWaiters.notifyAll();
}
}
if (memstoreRead >= e.getWriteNumber()) {
return true;
}
return false;
}
}
----------------------------
/**
* Wait for the global readPoint to advance upto
* the specified transaction number.
*/
public void waitForRead(WriteEntry e) {
boolean interrupted = false;
synchronized (readWaiters) {
while (memstoreRead < e.getWriteNumber()) {//等待readpoint大于当前的writepoint
try {
//(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
readWaiters.wait(0);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
}
}
}
if (interrupted) Thread.currentThread().interrupt();
}
12.MultiRowMutationProcessor执行postProcess
执行协同类
结束
上一篇: Scala基础教程6 -- 元组
下一篇: Scala可变数组