欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hbase put源码分析

程序员文章站 2022-06-14 18:57:11
...

 

这里写下HRegionServer在做put操作的源码:

 

HRegionServer 

 

  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
  throws ServiceException {
  ....
  try {
        region = getRegion(regionAction.getRegion());//获得对应操作的Region
      } catch (IOException e) {
        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
        continue;  // For this region it's a failure.
      }
  ....
   if (regionAction.hasAtomic() && regionAction.getAtomic()) {//是否需要原子操作
        // How does this call happen?  It may need some work to play well w/ the surroundings.
        // Need to return an item per Action along w/ Action index.  TODO.
        try {
          mutateRows(region, regionAction.getActionList(), cellScanner);//以下详细介绍
        } catch (IOException e) {
          // As it's atomic, we may expect it's a global failure.
          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        }
      } else {
        // doNonAtomicRegionMutation manages the exception internally
        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
            regionActionResultBuilder, cellsToReturn, nonceGroup);
      }
  ....
  }

 HRegionServer的mutateRows方法:

 

 

  protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
      final CellScanner cellScanner)
  throws IOException {
    if (!region.getRegionInfo().isMetaTable()) {
      cacheFlusher.reclaimMemStoreMemory();
    }
    RowMutations rm = null;
    for (ClientProtos.Action action: actions) {
      if (action.hasGet()) {
        throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
          action.getGet());
      }
      MutationType type = action.getMutation().getMutateType();
      if (rm == null) {
        rm = new RowMutations(action.getMutation().getRow().toByteArray());
      }
      switch (type) {
      case PUT:
        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
        break;
      case DELETE:
        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
        break;
      default:
          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
      }
    }
    region.mutateRow(rm);//HRegion执行操作
  }
  

 调用HRegion

 

 

    public void mutateRow(RowMutations rm) throws IOException {
    // Don't need nonces here - RowMutations only supports puts and deletes
    mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
  }

 HRegion的mutateRow详细过程:

 

  1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

  2.获取rowlock(rowid)

  3.尝试获取HRegion(updatesLock) read lock

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类

  5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  6.添加到对应HStore里的memstore中,(需要store的读锁)

  7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  8.释放HRegion(updatesLock) read lock

  9.释放rowlock(rowid) latchdown--

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

以下是带源码的分析过程:

1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

2.获取rowlock(rowid)

 

  /**
   * Tries to acquire a lock on the given row.
   * @param waitForLock if true, will block until the lock is available.
   *        Otherwise, just tries to obtain the lock and returns
   *        false if unavailable.
   * @return the row lock if acquired,
   *   null if waitForLock was false and the lock was not acquired
   * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
   */
  public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
    checkRow(row, "row lock");
    startRegionOperation();
    try {
      HashedBytes rowKey = new HashedBytes(row);
      RowLockContext rowLockContext = new RowLockContext(rowKey);

      // loop until we acquire the row lock (unless !waitForLock)
      while (true) {//循环获取此rowid
        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);//尝试获取rowid
        if (existingContext == null) {//没有其他获得此rowid
          // Row is not already locked by any thread, use newly created context.
          break;
        } else if (existingContext.ownedByCurrentThread()) {//当前进行已经获取此rowid
          // Row is already locked by current thread, reuse existing context instead.
          rowLockContext = existingContext;
          break;
        } else {//其他进行获得rowid
          // Row is already locked by some other thread, give up or wait for it
          if (!waitForLock) {
            return null;
          }
          try {
            if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {//使用CountDownLatch等待其他进行释放此rowid
              throw new IOException("Timed out waiting for lock for row: " + rowKey);
            }
          } catch (InterruptedException ie) {
            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
            InterruptedIOException iie = new InterruptedIOException();
            iie.initCause(ie);
            throw iie;
          }
        }
      }

      // allocate new lock for this thread
      return rowLockContext.newLock();
    } finally {
      closeRegionOperation();
    }
  }

 

3.尝试获取HRegion(updatesLock) read lock

  

  

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类 

 

   @Override
  public void process(long now,
                      HRegion region,
                      List<KeyValue> mutationKvs,
                      WALEdit walEdit) throws IOException {
    byte[] byteNow = Bytes.toBytes(now);
    // Check mutations and apply edits to a single WALEdit
    for (Mutation m : mutations) {//对每个row进行检查,更新timestamp
      if (m instanceof Put) {
        Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
        region.checkFamilies(familyMap.keySet());
        region.checkTimestamps(familyMap, now);
        region.updateKVTimestamps(familyMap.values(), byteNow);
      } else if (m instanceof Delete) {
        Delete d = (Delete) m;
        region.prepareDelete(d);
        region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow);
      } else {
        throw new DoNotRetryIOException(
            "Action must be Put or Delete. But was: "
            + m.getClass().getName());
      }
      for (List<Cell> cells: m.getFamilyCellMap().values()) {
        boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
        for (Cell cell : cells) {//对每个cf的row,写入walEdit对象
          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
          mutationKvs.add(kv);
          if (writeToWAL) {
            walEdit.add(kv);
          }
        }
      }
    }
  }

 

 

5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  writeEntry = mvcc.beginMemstoreInsert();

 

  /**
   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
   */
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;//使用当前HRegion的mvcc的writepoint++作为logedit的number
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }

   6.添加到对应HStore里的memstore中,(需要store的读锁)

 

   @Override
  public long add(final KeyValue kv) {
    lock.readLock().lock();
    try {
      return this.memstore.add(kv);
    } finally {
      lock.readLock().unlock();
    }
  }

   7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  /**
   * Append a set of edits to the log. Log edits are keyed by (encoded)
   * regionName, rowname, and log-sequence-id.
   *
   * Later, if we sort by these keys, we obtain all the relevant edits for a
   * given key-range of the HRegion (TODO). Any edits that do not have a
   * matching COMPLETE_CACHEFLUSH message can be discarded.
   *
   * <p>
   * Logs cannot be restarted once closed, or once the HLog process dies. Each
   * time the HLog starts, it must create a new log. This means that other
   * systems should process the log appropriately upon each startup (and prior
   * to initializing HLog).
   *
   * synchronized prevents appends during the completion of a cache flush or for
   * the duration of a log roll.
   *
   * @param info
   * @param tableName
   * @param edits
   * @param clusterIds that have consumed the change (for replication)
   * @param now
   * @param doSync shall we sync?
   * @param sequenceId of the region.
   * @return txid of this transaction
   * @throws IOException
   */
  @SuppressWarnings("deprecation")
  private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, 
      AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
      if (edits.isEmpty()) return this.unflushedEntries.get();
      if (this.closed) {
        throw new IOException("Cannot append; log is closed");
      }
      TraceScope traceScope = Trace.startSpan("FSHlog.append");
      try {
        long txid = 0;
        synchronized (this.updateLock) {//同步hlog的更新锁
          // get the sequence number from the passed Long. In normal flow, it is coming from the
          // region.
          long seqNum = sequenceId.incrementAndGet();
          //生成region的seqnum,用于当memstore flush了,则mem的seqnum>=hlog的seqnum,则删除这些hlog
          // The 'lastSeqWritten' map holds the sequence number of the oldest
          // write for each region (i.e. the first edit added to the particular
          // memstore). . When the cache is flushed, the entry for the
          // region being flushed is removed if the sequence number of the flush
          // is greater than or equal to the value in lastSeqWritten.
          // Use encoded name.  Its shorter, guaranteed unique and a subset of
          // actual  name.
          byte [] encodedRegionName = info.getEncodedNameAsBytes();//regioncode
          if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);//没有这个region,则添加(存储着这个region最早的修改seqnum)
          HLogKey logKey = makeKey(
            encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);

          synchronized (pendingWritesLock) {
            doWrite(info, logKey, edits, htd);//将logedit写入队列
            txid = this.unflushedEntries.incrementAndGet();
          }
          this.numEntries.incrementAndGet();
          this.asyncWriter.setPendingTxid(txid);

          if (htd.isDeferredLogFlush()) {
            lastUnSyncedTxid = txid;
          }
          this.latestSequenceNums.put(encodedRegionName, seqNum);
        }
        // TODO: note that only tests currently call append w/sync.
        //       Therefore, this code here is not actually used by anything.
        // Sync if catalog region, and if not then check if that table supports
        // deferred log flushing
        if (doSync &&
            (info.isMetaRegion() ||
            !htd.isDeferredLogFlush())) {
          // sync txn to file system
          this.sync(txid);
        }
        return txid;
      } finally {
        traceScope.close();
      }
    }

 8.释放HRegion(updatesLock) read lock

  

  9.释放rowlock(rowid) latchdown--

  

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

 // sync all transactions upto the specified txid
  private void syncer(long txid) throws IOException {
    synchronized (this.syncedTillHere) {
      while (this.syncedTillHere.get() < txid) {
        try {
          this.syncedTillHere.wait();

          if (txid <= this.failedTxid.get()) {
            assert asyncIOE != null :
              "current txid is among(under) failed txids, but asyncIOE is null!";
            throw asyncIOE;
          }
        } catch (InterruptedException e) {
          LOG.debug("interrupted while waiting for notification from AsyncNotifier");
        }
      }
    }
  }

 

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

 /**
   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   *
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
   */
  public void completeMemstoreInsert(WriteEntry e) {
    advanceMemstore(e);
    waitForRead(e);
  }
  
  /**
   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   *
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   *
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
   */
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {//将readpoint设置到最大的完成wal的writepoint值
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }
  ----------------------------
  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {//等待readpoint大于当前的writepoint
        try {
        	//(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

 

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

 

结束