欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

百度分布式id-UidGenerator之CachedUidGenerator源码解读

程序员文章站 2022-03-08 17:51:51
...

1:UidGenerator源码

https://github.com/baidu/uid-generator

2:CachedUidGenerator介绍

RingBuffer环形数组,数组每个元素成为一个slot。RingBuffer容量,默认为Snowflake算法中sequence最大值,且为2^N。可通过boostPower配置进行扩容,以提高RingBuffer 读写吞吐量。

Tail指针、Cursor指针用于环形数组上读写slot:

Tail指针
表示Producer生产的最大序号(此序号从0开始,持续递增)。Tail不能超过Cursor,即生产者不能覆盖未消费的slot。当Tail已赶上curosr,此时可通过rejectedPutBufferHandler指定PutRejectPolicy

Cursor指针
表示Consumer消费到的最小序号(序号序列与Producer序列相同)。Cursor不能超过Tail,即不能消费未生产的slot。当Cursor已赶上tail,此时可通过rejectedTakeBufferHandler指定TakeRejectPolicy

百度分布式id-UidGenerator之CachedUidGenerator源码解读

CachedUidGenerator采用了双RingBuffer,Uid-RingBuffer用于存储Uid、Flag-RingBuffer用于存储Uid状态(是否可填充、是否可消费)

由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 补齐方式。

RingBuffer填充时机
初始化预填充
RingBuffer初始化时,预先填充满整个RingBuffer.

即时填充
Take消费时,即时检查剩余可用slot量(tail - cursor),如小于设定阈值,则补全空闲slots。阈值可通过paddingFactor来进行配置,请参考Quick Start中CachedUidGenerator配置

周期填充
通过Schedule线程,定时补全空闲slots。可通过scheduleInterval配置,以应用定时填充功能,并指定Schedule时间间隔

3:CachedUidGenerator源码

主要部分CachedUidGenerator 它是继承DefaultUidGenerator里面的算法

3.1 重要几个参数

/** Tail: last position sequence to produce  尾部:要生成的最后一个位置序列 相当于每生产一个uid就+1 */
private final AtomicLong tail = new PaddedAtomicLong(START_POINT);

/** Cursor: current position sequence to consume  光标:要使用的当前位置序列  相当于每消耗一个uid就+1*/
private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
 
/** 存放uid的数组*/
private final long[] slots;
 /** 存放对应uid使用的状态,0代表已使用,1代表未使用*/
private final PaddedAtomicLong[] flags

3.2 获取id和填充id的方法

//从获取uid的方法
public long take() {
    // spin get next available cursor  旋转获取下一个可用光标---每次拿uid时,光标+1
    long currentCursor = cursor.get();
    long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);

    // check for safety consideration, it never occurs  出于安全考虑,绝不会发生
    Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

    // trigger padding in an async-mode if reach the threshold  如果达到阈值,则以异步模式触发填充
    long currentTail = tail.get();
    if (currentTail - nextCursor < paddingThreshold) {
        LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                nextCursor, currentTail - nextCursor);
        bufferPaddingExecutor.asyncPadding();
    }

    // cursor catch the tail, means that there is no more available UID to take 光标捕捉尾部,意味着没有更多可用的UID
    if (nextCursor == currentCursor) {
        rejectedTakeHandler.rejectTakeBuffer(this);
    }

    // 1. check next slot flag is CAN_TAKE_FLAG
    int nextCursorIndex = calSlotIndex(nextCursor);
    Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

    // 2. get UID from next slot
    // 3. set next slot flag as CAN_PUT_FLAG.  获取slots里面的值
    long uid = slots[nextCursorIndex];
    //更改状态,代表数据id已被获取掉
    flags[nextCursorIndex].set(CAN_PUT_FLAG);
    // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
    // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
    return uid;
}
//填充方法
public void paddingBuffer() {
    LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);

    // is still running
    if (!running.compareAndSet(false, true)) {
        LOGGER.info("Padding buffer is still running. {}", ringBuffer);
        return;
    }

    // fill the rest slots until to catch the cursor
    boolean isFullRingBuffer = false;
    while (!isFullRingBuffer) {
        //获取该s内部全部的uid
        List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
        for (Long uid : uidList) {
            isFullRingBuffer = !ringBuffer.put(uid);
            if (isFullRingBuffer) {
                break;
            }
        }
    }

    // not running now
    running.compareAndSet(true, false);
    LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}
//填充id到slots数组,填充flags状态变更
public synchronized boolean put(long uid) {
    long currentTail = tail.get();
    long currentCursor = cursor.get();

    // tail catches the cursor, means that you can't put any cause of RingBuffer is full
    long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
    if (distance == bufferSize - 1) {
        rejectedPutHandler.rejectPutBuffer(this, uid);
        return false;
    }

    // 1. pre-check whether the flag is CAN_PUT_FLAG
    int nextTailIndex = calSlotIndex(currentTail + 1);
    if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
        rejectedPutHandler.rejectPutBuffer(this, uid);
        return false;
    }

    // 2. put UID in the next slot
    // 3. update next slot' flag to CAN_TAKE_FLAG
    // 4. publish tail with sequence increase by one 填充id,更改状态
    slots[nextTailIndex] = uid;
    flags[nextTailIndex].set(CAN_TAKE_FLAG);
    tail.incrementAndGet();

    // The atomicity of operations above, guarantees by 'synchronized'. In another word,
    // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
    return true;
}

1:定义数组大小(默认是65536)slots,flags,项目启动时,初始化相关数据,uid填充进slots,状态1填充到flags,每生产一个uid,tail+1

2:消耗uid的方法,cursor 光标从0开始,每消耗一次uid,光标就+1,根据光标找到slots对应数组的位置,根据位置获取slots数组里面的uid,同时变更flags对应位置的状态0

3:当消耗到50%时, bufferPaddingExecutor.asyncPadding(); 异步触发填充方法,开始生产uid,tail+1,tail找到对应slots数组的位置,把生成的uid替换旧位置的值,然后变更flags状态为1.

4:先理解上一个DefaultUidGenerator,就容易理解这个,源码还是很清晰的。基于这种思维,也可以自己写一个类似分布式id