欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

线程安全的无锁RingBuffer的实现

程序员文章站 2022-06-05 18:17:25
在程序设计中,我们有时会遇到这样的情况,一个线程将数据写到一个buffer中,另外一个线程从中读数据。所以这里就有多线程竞争的问题。 ......

本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q

 

在程序设计中,我们有时会遇到这样的情况,一个线程将数据写到一个buffer中,另外一个线程从中读数据。所以这里就有多线程竞争的问题。

通常的解决办法是对竞争资源加锁。但是,一般加锁的损耗较高。其实,对于这样的一个线程写,一个线程读的特殊情况,可以以一种简单的无锁ringbuffer来实现。这样代码的运行效率很高。

代码的基本原理如下。

 

线程安全的无锁RingBuffer的实现

 

如图所示,假定buffer的长度是buffersize. 我们设置两个指针。head指向的是下一次读的位置,而tail指向的是下一次写的位置。由于这里是环形buffer (ring buffer),这里就有一个问题,怎样判断buffer是满或者空。

这里采用的规则是,buffer的最后一个单元不存储数据。所以,如果head == tail,那么说明buffer为空。如果 head == tail + 1 (mod buffersize),那么说明buffer满了。

 

接下来就是最重要的内容了:怎样以无锁的方式进行线程安全的buffer的读写操作。基本原理是这样的。在进行读操作的时候,我们只修改head的值,而在写操作的时候我们只修改tail的值。在写操作时,我们在写入内容到buffer之后才修改tail的值;而在进行读操作的时候,我们会读取tail的值并将其赋值给copytail。

 

赋值操作是原子操作。所以在读到copytail之后,从head到copytail之间一定是有数据可以读的,不会出现数据没有写入就进行读操作的情况。同样的,读操作完成之后,才会修改head的数值;而在写操作之前会读取head的值判断是否有空间可以用来写数据。

所以,这时候tail到head - 1之间一定是有空间可以写数据的,而不会出现一个位置的数据还没有读出就被写操作覆盖的情况。这样就保证了ringbuffer的线程安全性。

最后附上代码供参考。欢迎批评指正,也欢迎各种讨论!

public class ringbuffer {

    private final static int buffersize = 1024;
    private string[] buffer = new string[buffersize];
    private int head = 0;
    private int tail = 0;
    
    private boolean empty() {
        return head == tail;
    }
    private boolean full() {
        return (tail + 1) % buffersize == head;
    }
    public boolean put(string v) {
        if (full()) {
            return false;
        }
        buffer[tail] = v;
        tail = (tail + 1) % buffersize;
        return true;
    }
    public string get() {
        if (empty()) {
            return null;
        }
        string result = buffer[head];
        head = (head + 1) % buffersize;
        return result;
    }
    public string[] getall() {
        if (empty()) {
            return new string[0];
        }
        int copytail = tail;
        int cnt = head < copytail ? copytail - head : buffersize - head + copytail;
        string[] result = new string[cnt];
        if (head < copytail) {
            for (int i = head; i < copytail; i++) {
                result[i - head] = buffer[i];
            }
        } else {
            for (int i = head; i < buffersize; i++) {
                result[i - head] = buffer[i];
            }
            for (int i = 0; i < copytail; i++) {
                result[buffersize - head + i] = buffer[i];
            }
        }
        head = copytail;
        return result;
    }
}