欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

深度解析Kafka中CopyOnWriteMap实现原理

程序员文章站 2022-07-13 10:08:26
...

原创不易,转载请注明出处


前言

Kakfa消息生产者端 业务在发送消息的时候,会将消息追加写到RecordAccumulator这个内存缓冲区的队列中,在追加的时候首先会根据消息的topic与partition 去一个map中获取对应的Deque队列,如果对应的队列不存在,就会创建一个,并添加到map中,然后才会将消息追加到队列队尾的batch元素中,我们知道,Kafka发送消息是支持多线程并发的,也就是往内存缓冲区中追加也是多线程并发的,其实在往Deque队列中追加写的过程是使用了synchronized来保证并发安全性,但是在获取topic-partition对应Deque队列的时候如果使用普通的HashMap是会有并发安全问题的,因为队列不存在,会创建一个,然后put到map中这个操作是线程不安全的。当然面对这种问题我们有很多解决方案,比如说加锁串行化,也可以使用并发安全的map ConcurrentHashMap这种分段锁实现的,但是Kafka没有使用这几种方案,而是自己实现了一个更加切合业务场景的map,CopyOnWriteMap。

1. 加锁copy写

我们先来看它的定义,成员与构造

public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    // 可见性
    private volatile Map<K, V> map;

    public CopyOnWriteMap() {

        /// 创建一个空map
        this.map = Collections.emptyMap();
    }

CopyOnWriteMap 实现ConcurrentMap ,成员就一个,是个map,这个成员是volatile 关键字修饰的,一旦发生了改变,所有线程可见,关于volatile不知道的同学可以上网查阅下资料,这是个java 并发编程中非常重要的点,也是面试常问的点。构造就是创建了一个普通的map。
接下来我们看下它比较典型的写方法putIfAbsent与put。

 // 加锁
 @Override
 public synchronized V putIfAbsent(K k, V v) {
     if (!containsKey(k))// 如果不存在 就会添加
         return put(k, v);
     else

         // 如果存在的话返回 存在的
         return get(k);
 }

可以看到这个putIfAbsent 方法是由synchronized 修饰的,也就是多线程并发安全的,判断如果不存在这个key,就调用put方法添加
如果存在了就调用get 方法获取。
我们看下这个put方法

// put的时候加锁
@Override
public synchronized V put(K k, V v) {
    // cp之前的那个map
    Map<K, V> copy = new HashMap<K, V>(this.map);
    // 添加
    V prev = copy.put(k, v);
    // 重新赋值
    this.map = Collections.unmodifiableMap(copy);
    return prev;
}

可以看到这个put方法也是synchronized 修饰的,先会cp一份之前的那个map,然后在cp的那个map中添加,接下来重要的来了,给map重新赋值。由于成员map是volatile 修饰的,拥有可见性,那些读的线程就会立马感知到map的引用变了,变成了一个新的map。

其实还有这些 putAll ,remove ,replace 有关写的方法都是加锁的cp 操作的。

2. 无锁并发读

我们再来看下并发无锁并发读
拿最常用的get方法来说

@Override
public V get(Object k) {
    return map.get(k);
}

就是调用map的get 方法来读的,而且是无锁的。
像containsValue,containsKey,entrySet,isEmpty,keySet,size,values这些有关读操作都是无锁的。

总结

比如说我们在put的时候,会将并发put的操作串行化,但是它并没有锁map,而是锁的这个方法,这样做的好处就是你在put的时候,不影响那些并发读的线程,但是这种写时cp的方式有个问题就是,如果写很频繁的时候,容易造成大量内存的浪费,gc 成本增加,所以Kafka这个实现的这个CopyOnWriteMap特别适用于读多写少的场景,特别是这种无锁并发读,大大提高并发度,它使用在获取Deque这个业务中是正好的,因为这个topic与partition是不经常变动的,就是在消息生产者在开始的时候,可能会出现出现并发写,一旦初始化好了,基本都是读操作了。