欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

linux内核之无锁缓冲队列kfifo原理(结合项目实践)

程序员文章站 2022-07-14 16:21:52
...

Linux kernel里面从来就不缺少简洁,优雅和高效的代码,只是我们缺少发现和品味的眼光。在Linux kernel里面,简洁并不表示代码使用神出鬼没的超然技巧,相反,它使用的不过是大家非常熟悉的基础数据结构,但是kernel开发者能从基础的数据结构中,提炼出优美的特性。

kfifo就是这样的一类优美代码,它十分简洁,绝无多余的一行代码,却非常高效。

关于kfifo信息如下:

本文分析的源代码版本: 2.6.24.4
kfifo的定义文件: kernel/kfifo.c
kfifo的头文件: include/linux/kfifo.h

1. kfifo概述

kfifo是内核里面的一个First In First Out数据结构,它采用环形循环队列的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证kfifo的线程安全

kfifo代码既然肩负着这么多特性,那我们先一敝它的代码:

struct kfifo {
    unsigned char *buffer;    /* the buffer holding the data */
    unsigned int size;    /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;    /* data is extracted from off. (out % size) */
    spinlock_t *lock;    /* protects concurrent modifications */
};

这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:

buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它向上扩展成2的幂
lock: 如果使用不能保证任何时间最多只有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一起构成一个循环队列。 in指向buffer中队头,而且out指向buffer中的队尾,它的结构如示图如下:

+--------------------------------------------------------------+
|            |<----------data---------->|                      |
+--------------------------------------------------------------+
             ^                          ^                      ^
             |                          |                      |
            out                        in                     size

当然,内核开发者使用了一种更好的技术处理了in, out和buffer的关系,我们将在下面进行详细分析。

2. kfifo功能描述

  1. 只支持一个读者和一个读者并发操作
  2. 无阻塞的读写操作,如果空间不够,则返回实际访问空间

kfifo_alloc 分配kfifo内存和初始化工作

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;

    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        BUG_ON(size > 0x80000000);
        size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);

    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
        kfree(buffer);

    return ret;
}

1.判断一个数是不是2的次幂 :
kfifo要保证其缓存空间的大小为2的次幂,如果不是则向上取整为2的次幂。其对于2的次幂的判断方式也是很巧妙的。如果一个整数n是2的次幂,则二进制模式必然是1000…,而n-1的二进制模式则是0111…,也就是说n和n-1的每个二进制位都不相同,例如:8(1000)和7(0111);n不是2的次幂,则n和n-1的二进制必然有相同的位都为1的情况,例如:7(0111)和6(0110)。这样就可以根据 n & (n-1)的结果来判断整数n是不是2的次幂,实现如下:

static inline bool is_power_of_2(uint32_t n)
{
    return (n != 0 && ((n & (n - 1)) == 0));
}

2.将数字向上取整为2的次幂 :
如果设定的缓冲区大小不是2的次幂,则向上取整为2的次幂,例如:设定为5,则向上取为8。上面提到整数n是2的次幂,则其二进制模式为100…,故如果正数k不是n的次幂,只需找到其最高的有效位1所在的位置(从1开始计数)pos,然后1 << pos即可将k向上取整为2的次幂。实现如下:

static inline uint32_t roundup_power_of_2(uint32_t a)
{
    if (a == 0)
        return 0;

    uint32_t position = 0;
    for (int i = a; i != 0; i >>= 1)
        position++;

    return static_cast<uint32_t>(1 << position);
}

fifo->in & (fifo->size - 1) 再比如这种写法取模,获取已用的大小。这样用逻辑与的方式相较于加减法更有效率

3. __kfifo_put与__kfifo_get详解

Linux内核中kfifo实现技巧,主要集中在放入数据的put方法和取数据的get方法

__kfifo_put是入队操作,它先将数据放入buffer里面,最后才修改in参数;
__kfifo_get是出队操作,它先将数据从buffer中移走,最后才修改out。

代码如下:

unsigned int __kfifo_put(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->size - fifo->in + fifo->out);

    /*
     * Ensure that we sample the fifo->out index -before- we
     * start putting bytes into the kfifo.
     */

    smp_mb();

    /* first put the data starting from fifo->in to buffer end */
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

    /* then put the rest (if any) at the beginning of the buffer */
    memcpy(fifo->buffer, buffer + l, len - l);

    /*
     * Ensure that we add the bytes to the kfifo -before-
     * we update the fifo->in index.
     */

    smp_wmb();

    fifo->in += len;

    return len;
}

6行,环形缓冲区的剩余容量为fifo->size - fifo->in + fifo->out,让写入的长度取len和剩余容量中较小的,避免写越界;
13行,加内存屏障,保证在开始放入数据之前,fifo->out取到正确的值(另一个CPU可能正在改写out值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1),所以fifo->size - (fifo->in & (fifo->size - 1)) 即位 fifo->in 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为需要拷贝l 字节到fifo->buffer + fifo->in的位置上。
17行,拷贝l 字节到fifo->buffer + fifo->in的位置上,如果l = len,则已拷贝完成,第20行len – l 为0,将不执行,如果l = fifo->size - (fifo->in & (fifo->size - 1)) ,则第20行还需要把剩下的 len – l 长度拷贝到buffer的头部。
27行,加写内存屏障,保证in 加之前,memcpy的字节已经全部写入buffer,如果不加内存屏障,可能数据还没写完,另一个CPU就来读数据,读到的缓冲区内的数据不完全,因为读数据是通过 in – out 来判断的。
29行,注意这里 只是用了 fifo->in += len而未取模,这就是kfifo的设计精妙之处,这里用到了unsigned int的溢出性质,当in 持续增加到溢出时又会被置为0,这样就节省了每次in向前增加都要取模的性能,锱铢必较,精益求精,让人不得不佩服。

unsigned int __kfifo_get(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->in - fifo->out);

    /*
     * Ensure that we sample the fifo->in index -before- we
     * start removing bytes from the kfifo.
     */

    smp_rmb();

    /* first get the data from fifo->out until the end of the buffer */
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

    /* then get the rest (if any) from the beginning of the buffer */
    memcpy(buffer + l, fifo->buffer, len - l);

    /*
     * Ensure that we remove the bytes from the kfifo -before-
     * we update the fifo->out index.
     */

    smp_mb();

    fifo->out += len;

    return len;
}

6行,可去读的长度为fifo->in – fifo->out,让读的长度取len和剩余容量中较小的,避免读越界;
13行,加读内存屏障,保证在开始取数据之前,fifo->in取到正确的值(另一个CPU可能正在改写in值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->out % kfifo->size 可以转化为 kfifo->out & (kfifo->size – 1),所以fifo->size - (fifo->out & (fifo->size - 1)) 即位 fifo->out 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为从fifo->buffer + fifo->in到末尾所要去读的长度。
17行,从fifo->buffer + fifo->out的位置开始读取l长度,如果l = len,则已读取完成,第20行len – l 为0,将不执行,如果l =fifo->size - (fifo->out & (fifo->size - 1)) ,则第20行还需从buffer头部读取 len – l 长。
27行,加内存屏障,保证在修改out前,已经从buffer中取走了数据,如果不加屏障,可能先执行了增加out的操作,数据还没取完,令一个CPU可能已经往buffer写数据,将数据破坏,因为写数据是通过fifo->size - (fifo->in & (fifo->size - 1))来判断的 。
29行,注意这里 只是用了 fifo->out += len 也未取模,同样unsigned int的溢出性质,当out 持续增加到溢出时又会被置为0,如果in先溢出,出现 in < out 的情况,那么 in – out 为负数(又将溢出),in – out 的值还是为buffer中数据的长度。

图解一下 in 先溢出的情况,size = 64, 写入前 in = 4294967291, out = 4294967279 ,数据 in – out = 12;

linux内核之无锁缓冲队列kfifo原理(结合项目实践)
写入 数据16个字节,则 in + 16 = 4294967307,溢出为 11,此时 in – out = –4294967268,溢出为28,数据长度仍然正确,由此可见,在这种特殊情况下,这种计算仍然正确,是不是让人叹为观止,妙不可言?

linux内核之无锁缓冲队列kfifo原理(结合项目实践)

4. kfifo_get和kfifo_put无锁并发操作

计算机科学家已经证明,当只有一个读经程和一个写线程并发操作时,不需要任何额外的锁,就可以确保是线程安全的,也即kfifo使用了无锁编程技术,以提高kernel的并发。

kfifo使用in和out两个指针来描述写入和读取游标,对于写入操作,只更新in指针,而读取操作,只更新out指针,可谓井水不犯河水,示意图如下:

                                               |<--写入-->|
+--------------------------------------------------------------+
|                        |<----------data----->|               |
+--------------------------------------------------------------+
                         |<--读取-->|
                         ^                     ^               ^
                         |                     |               |
                        out                   in              size

为了避免读者看到写者预计写入,但实际没有写入数据的空间,写者必须保证以下的写入顺序:

1.往[kfifo->in, kfifo->in + len]空间写入数据
2.更新kfifo->in指针为 kfifo->in + len

在操作1完成时,读者是还没有看到写入的信息的,因为kfifo->in没有变化,认为读者还没有开始写操作,只有更新kfifo->in之后,读者才能看到。

那么如何保证1必须在2之前完成,秘密就是使用内存屏障:smp_mb(),smp_rmb(), smp_wmb(),来保证对方观察到的内存操作顺序。

5. 总结

kfifo优点:

  1. 实现单消费者和单生产者的无锁并发访问。多消费者和多生产者的时候还是需要加锁的。
  2. 使用与运算in & (size-1)代替模运算
  3. 在更新in或者out的值时不做模运算,而是让其自动溢出。这应该是kfifo实现最牛叉的地方了,利用溢出后的值参与运算,并且能够保证结果的正确。溢出运算保证了以下几点:
    • in - out为缓冲区中的数据长度
    • size - in + out 为缓冲区中空闲空间
    • in == out时缓冲区为空
    • size == (in - out)时缓冲区满了

读完kfifo代码,令我想起那首诗“众里寻他千百度,默然回首,那人正在灯火阑珊处”。不知你是否和我一样,总想追求简洁,高质量和可读性的代码,当用尽各种方法,江郞才尽之时,才发现Linux kernel里面的代码就是我们寻找和学习的对象。

6. 项目使用介绍

思想和上面差不多,只不过把队列内容改为了指针,是为了我们项目存储数据(使用时malloc)用到。废话不多说,直接上源码:

struct kfifo {
	void  **data;    /* the buffer holding the data */
	unsigned int size;    /* the count of the data */
	unsigned int pod;    /* productor */
	unsigned int cons;    /* consumer */
};

static inline uint32_t roundup_power_of_two(uint32_t a)
{
    if (a == 0)
        return 0;

    uint32_t position = 0;
    for (int i = a; i != 0; i >>= 1)
        position++;

    return static_cast<uint32_t>(1 << position);
}

struct kfifo* kfifo_alloc(uint32_t size)
{
	struct kfifo* fifo;
	if (size & (size - 1))
	{
		size = roundup_pow_of_two(size);
	}
	FORCE_DEBUG(w_log_fp, "kfifo_alloc file stream queue len:%d \n", size);
	fifo = (struct kfifo* )malloc(sizeof(struct kfifo));
	fifo->data = (void**)malloc(size * sizeof(void*));
	fifo->size = size;
	fifo->pod = 0;
	fifo->cons = 0;
	return fifo;
}

void kfifo_free(struct kfifo* fifo)
{
	uint32_t i = 0;
	for ( i = 0; i < fifo->size; i++ )
	{
		free(fifo->data[i]);
	}
	free(fifo);
}

int kfifo_put(struct kfifo* fifo, void * data)
{
	if ( data == NULL )
		return 0;
   if (fifo->pod - fifo->cons < fifo->size)
   {
      fifo->data[fifo->pod & (fifo->size - 1) ] = data;
      fifo->pod++;
      return 1;
   }
   else
      return 0;
}

void* kfifo_get(struct kfifo* fifo)
{
   void * result = NULL;
   if (fifo->pod != fifo->cons)
   {
      result = fifo->data[fifo->cons & (fifo->size - 1) ];
      fifo->cons++;
   }
   return result;
}

7. 其它 userspace 移植实现

kfifo的移植:
https://blog.csdn.net/liigo/article/details/100993236

参考:
https://www.cnblogs.com/wangshaowei/p/11559522.html
https://blog.csdn.net/linyt/article/details/53355355
https://blog.csdn.net/chen19870707/article/details/39899743