KFiFo浅析
#1.前言
这段时间在写一个基于PCIE实现FPGA与上位机通信,利用多线程实现读写同步,本次使用到了KFIFO无锁队列,以下是本人自己对kfifo无锁队列原理的理解。
#2.概述
提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为。当然,若是采用其他消费模式自然是要加锁了。对应kfifo结构体如下:
struct kfifo {
unsigned char *buffer; /* the begaining address for the buffer holding the data, not changed */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
};
buffer:用于存放数据缓存;size:buffer空间大小,初始化时将其向上扩展为2的幂;in:指buffer队头;out:指队尾。其中由buffer、in、out实现循环队列。
#3.kfifo功能描述及实现
##3.1kfifo_alloc 分配kfifo内存和初始化工作
kfifo中尤为重要的一个是对size(buffer空间大小)向上取2的幂。对于设置的size,我们首先要做的是判断这个数是不是2的次幂,关于这个问题,参考这个链接,这个链接也含了kfifo的原理:关于2的次幂问题
下面是roundup_pow_of_two()函数代码:
//kfifo内存分配
struct kfifo *KFiFo::kfifo_alloc(unsigned int size)
{
unsigned char *buffer = NULL;
struct kfifo *fifo = NULL;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) {
if(size > 0x80000000)
return NULL;
size = roundup_pow_of_two(size);
}
fifo = (struct kfifo *) malloc(sizeof(struct kfifo));
if (!fifo)
return NULL;
buffer = (unsigned char *)malloc(size);
if (!buffer)
{
free(fifo);
return NULL;
}
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
return fifo;
}
unsigned int KFiFo::roundup_pow_of_two(const unsigned int x)
{
if (x == 0){ return 0; }
if (x == 1){ return 2; }
unsigned int ret = 1;
while (ret < x)
{
ret = ret << 1;
}
return ret;
}
##3.2kfifo中的入队与出队
kfifo的入队操作:kfifo_in,它首先将数据放入buffer中,然后改变kfifo->in的值。同样的对于kfifo的出队操作:kfifo_out,也是先将数据从buffer中取出,再改变的kfifo->out的值。那么kfifo的入队与出队是怎么样实现队列的循环的呢?下面将结合下图进行解释,
kfifo在进行初始化时,定义的kfifo的地址是*buffer,该缓存空间大小为size,队头为in,队尾为out。此时,如下图所示fifo->in=fifo->out=0(在操作过程中,只要fifo->in=fifo->out即代表队列为空):
假设,在第一次放入一些数据后,此时的fifo空间如下图所示,我们可以看到放入一定长度的数据后,fifo-in的值在数据放入buffer后随之修改,由于没有取数据,所以fifo->out的值并没有发生改变。
此时,buffer中数据的长度:data=fifo->in-fifo->out。buffer剩余空闲空间长度为:fifo->size-(fifo->in-fifo->out)。
在以上的基础上取出一定长度数据后,如下图所示,可见,此时fifo->out的值发生了变化。
对于kfifo_in()函数,对应代码如下:
/**
* kfifo_in - puts some data into the FIFO queue
* @from: the data to be pushed in the queue head.
* @len: the length of the data wanted to be pushed in.
* return: number of bytes coied into the queue.
*
* This function copies at most @len bytes from the @from buffer into
* the FIFO queue depending on the free space, and returns the number of
* bytes copied. If there is no lefted space in the FIFO queue, no bytes
* will be copied.
*
*/
unsigned int KFiFo::kfifo_in(const void *from, unsigned int len)
{
/*lefted is the avaiable size in the fifo buffer*/
unsigned int lefted = fifo->size - fifo->in + fifo->out;
/*get the min value between the lefted data size in the buffer and the wanted size */
len = min(len, lefted);
/*off is the length from fifo->buffer, the buffer beginning to the queue head*/
unsigned int off = fifo->in & (fifo->size - 1);
/* first put l data starting from the queue head to the buffer end */
unsigned int l = min(len, fifo->size - off);
memcpy(fifo->buffer + off, from, l);
/* then put the rest (if any) at fifo->buffer, the beginning of the buffer */
memcpy(fifo->buffer, (unsigned char *)from + l, len - l);
fifo->in += len;
return len;
}
那么在下一次向buffer中放入数据之前,需要考虑由in到buffer_end的空闲空间长度是否足够存放这一组数据。从上面的kfifo_in()函数, lefted = fifo->size - fifo->in + fifo->out计算出整个fifo剩余的空闲空间,由min()判断剩余的空闲空间能否完全存储这一组数据。再计算出buffer的初始位置到队列头部in这一段的长度off(off = fifo->in & (fifo->size - 1)),最后将数据由in至buffer_end放入数据,若还有剩余数据未放入队列中,则将剩余数据从buffer初始位置开始放入buffer中。此过程示意图如下:
对于kfifo_out()函数,与kfifo_in()函数类似,此处不详解,代码如下:
/**
* kfifo_out - gets some data from the FIFO queue
* @to: where the data copied.
* @len: the size of the buffer wanted to be copied.
* return: number of bytes copied out of the queue.
*
* This function copies at most @len bytes from the FIFO into the
* @to buffer and returns the number of copied bytes.
* If there is no lefted bytes in the FIFO queue, no bytes will be copied out. *
*/
unsigned int KFiFo::kfifo_out(void *to, unsigned int len)
{
/*buffered is the length of the queue*/
unsigned int buffered = fifo->in - fifo->out;
/*get the min value between the buffered size and the wanted size */
len = min(len, buffered);
/*off is the length from the buffer beginning to the queue tail*/
unsigned int off = fifo->out & (fifo->size - 1);
/* first get the data from (fifo->buffer + off), the queue tail to the end of the buffer */
unsigned int l = min(len, fifo->size - off);;
memcpy(to, fifo->buffer + off, l);
/* then get the rest (if any) from fifo->buffer, the beginning of the buffer */
memcpy((unsigned char *)to + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
当前只是实现了单生产者和单消费者模式的无锁队列,那么对于环形队列和多生产和多消费者模式下的应用又该如何实现。
上一篇: Java NIO使用及原理分析(二)
下一篇: Linux内核中的队列 kfifo【转】