欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

disruptor

程序员文章站 2023-11-09 16:38:10
disruptor 一 概述 CPU缓存网页浏览器为了加快速度,会在本机存缓存以前浏览过的数据; 传统数据库或NoSQL数据库为了加速查询, 常在内存设置一个缓存, 减少对磁盘(慢)的IO. 同样内存与CPU的速度相差太远, 于是CPU设计者们就给CPU加上了缓存(CPU Cache). 如果你需要 ......

disruptor

一 概述

cpu缓存
网页浏览器为了加快速度,会在本机存缓存以前浏览过的数据; 传统数据库或nosql数据库为了加速查询, 常在内存设置一个缓存, 减少对磁盘(慢)的io. 同样内存与cpu的速度相差太远, 于是cpu设计者们就给cpu加上了缓存(cpu cache). 如果你需要对同一批数据操作很多次, 那么把数据放至离cpu更近的缓存, 会给程序带来很大的速度提升. 例如, 做一个循环计数, 把计数变量放到缓存里,就不用每次循环都往内存存取数据了. 下面是cpu cache的简单示意图.

disruptorcpucache


随着多核的发展, cpu cache分成了三个级别: l1, l2, l3. 级别越小越接近cpu, 所以速度也更快, 同时也代表着容量越小.

 

从cpu到 大约需要的cpu周期 大约需要的时间(单位ns) 内存大小
寄存器 1 cycle    
l1 cache ~3-4 cycles ~0.5-1 ns 32k
l2 cache ~10-20 cycles ~3-7 ns 256k
l3 cache ~40-45 cycles ~15 ns 12mb
跨cpu槽传输   ~20 ns  
内存 ~120-240 cycles ~60-120ns  
public class cachetest {
    static  long[][] arr;

    public static void main(string[] args) {
        arr = new long[1024 * 1024][];
        for (int i = 0; i < 1024 * 1024; i++) {
            arr[i] = new long[8];
            for (int j = 0; j < 8; j++) {
                arr[i][j] = 0l;
            }
        }
        long sum = 0l;
        long marked = system.currenttimemillis();
        for (int i = 0; i < 1024 * 1024; i+=1) {   // 加载了1024 * 1024 次
            for(int j =0; j< 8;j++){
                sum = arr[i][j];   // arr[i][0] --- arr[i][7]
            }
        }
        system.out.println("loop times:" + (system.currenttimemillis() - marked) + "ms"); // 18ms

        marked = system.currenttimemillis();
        for (int i = 0; i < 8; i+=1) {  // 加载了1024*1024 * 8
            for(int j =0; j< 1024 * 1024;j++){
                sum = arr[j][i];
            }
        }
        system.out.println("loop times:" + (system.currenttimemillis() - marked) + "ms"); // 155ms
    }
}

result:

loop times:13ms
loop times:58ms

cpu存取缓存都是按行为最小单位操作的. 在这儿我将不提及缓存的associativity问题, 将问题简化一些. 一个java long型占8字节, 所以从一条缓存行上你可以获取到8个long型变量. 所以如果你访问一个long型数组, 当有一个long被加载到cache中, 将无消耗地加载了另外7个. 所以可以非常快地遍历数组.

查看cpu核心数
cat /proc/cpuinfo
查看缓存行大小
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
perf stat -e l1-dcache-load-misses java l1cachemiss
查看l1-dcache-load-misses

l1 cache满了之后才会发生的cache miss. 其实cache miss的原因有下面三种:

  1. 第一次访问数据, 在cache中根本不存在这条数据, 所以cache miss, 可以通过prefetch解决.
  2. cache冲突, 需要通过补齐来解决.
  3. 就是我示例的这种, cache满, 一般情况下我们需要减少操作的数据大小, 尽量按数据的物理顺序访问数据.

@contended //-xx:-restrictcontended

典型的cpu微架构有3级缓存, 每个核都有自己私有的l1, l2缓存. 那么多线程编程时, 另外一个核的线程想要访问当前核内l1, l2 缓存行的数据, 该怎么办呢?

首先我们需要谈到一个协议–mesi协议, m,e,s和i代表使用mesi协议时缓存行所处的四个状态:
m(修改, modified): 本地处理器已经修改缓存行, 即是脏行, 它的内容与内存中的内容不一样. 并且此cache只有本地一个拷贝(专有).
e(专有, exclusive): 缓存行内容和内存中的一样, 而且其它处理器都没有这行数据
s(共享, shared): 缓存行内容和内存中的一样, 有可能其它处理器也存在此缓存行的拷贝
i(无效, invalid): 缓存行失效, 不能使用

disruptormesi


初始 一开始时, 缓存行没有加载任何数据, 所以它处于i状态
**本地写(local write)**如果本地处理器写数据至处于i状态的缓存行, 则缓存行的状态变成m
本地读(local read) 如果本地处理器读取处于i状态的缓存行, 很明显此缓存没有数据给它. 此时分两种情况: (1)其它处理器的缓存里也没有此行数据, 则从内存加载数据到此缓存行后, 再将它设成e状态, 表示只有我一家有这条数据, 其它处理器都没有 (2)其它处理器的缓存有此行数据, 则将此缓存行的状态设为s状态.
远程读(remote read) 假设我们有两个处理器c1和c2. 如果c2需要读另外一个处理器c1的缓存行内容, c1需要把它缓存行的内容通过内存控制器(memory controller)发送给c2, c2接到后将相应的缓存行状态设为s. 在设置之前, 内存也得从总线上得到这份数据并保存.
远程写(remote write) 其实确切地说不是远程写, 而是c2得到c1的数据后, 不是为了读, 而是为了写. 也算是本地写, 只是c1也拥有这份数据的拷贝, 这该怎么办呢? c2将发出一个rfo(request for owner)请求, 它需要拥有这行数据的权限, 其它处理器的相应缓存行设为i, 除了它自已, 谁不能动这行数据. 这保证了数据的安全, 同时处理rfo请求以及设置i的过程将给写操作带来很大的性能消耗.

 

分析volatile

   
汇编代码: 0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp);
java instance = new singleton();//instance是volatile变量

lock前缀的指令在多核处理器下会引发了两件事情。
将当前处理器缓存行的数据会写回到系统内存。
这个写回内存的操作会引起在其他cpu里缓存了该内存地址的数据无效。
处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(l1,l2或其他)后再进行操作,但操作完之后不知道何时会写到内存,如果对声明了volatile变量进行写操作,jvm就会向处理器发送一条lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态(i),当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。
如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处理共享状态(s),那么正在嗅探的处理器将无效它的缓存行,在下次访问相同内存地址时,强制执行缓存行填充。

public class falsesharing implements runnable{
        public final static long iterations = 500l * 1000l * 100l;
        private int arrayindex = 0;
        private static valuenopadding[] longs;

        public falsesharing(final int arrayindex) {
            this.arrayindex = arrayindex;
        }

        public static void main(final string[] args) throws exception {
            for(int i=1;i<10;i++){
                system.gc();
                final long start = system.currenttimemillis();
                runtest(i);
                system.out.println("thread num "+i+" duration = " + (system.currenttimemillis() - start));
            }

        }
 
        private static void runtest(int num_threads) throws interruptedexception {
            thread[] threads = new thread[num_threads];
            longs = new valuenopadding[num_threads];
            for (int i = 0; i < longs.length; i++) {
                longs[i] = new valuenopadding();
            }
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new thread(new falsesharing(i));
            }
            for (thread t : threads) {
                t.start();
            }
            for (thread t : threads) {
                t.join();
            }
        }
 
        public void run() {
            long i = iterations + 1;
            while (0 != --i) {
                longs[arrayindex].value = 0l;
            }
        }
    // 对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。
    /**
     thread num 1 duration = 509
     thread num 2 duration = 508
     thread num 3 duration = 532
     thread num 4 duration = 1007
     thread num 5 duration = 784
     thread num 6 duration = 989
     thread num 7 duration = 905
     thread num 8 duration = 1020
     thread num 9 duration = 1163
     */
    // java long型占8字节  64字节
    public final static class valuepadding {
        //如果只对final的falsesharing类可见(就是说valuepadding不能再被继承了)。这样一来编译器就会“知道”它正在审视的是所有可以看到这个填充字段的代码,这样就可以证明没有行为依赖于p1到p7这些字段。那么“聪明”的jvm会把上面这些丝毫不占地方的字段统统优化掉。
        protected long p1, p2, p3, p4, p5, p6, p7;
            protected volatile long value = 0l;
            protected long p8, p9, p10, p11, p12, p13, p14;
            //无论当前cache line被填充到了什么位置,我们都能保证value这个值都独自占用一个cache line
        }

    /**
     * thread num 1 duration = 696
     * thread num 2 duration = 1402
     * thread num 3 duration = 1830
     * thread num 4 duration = 2546
     * thread num 5 duration = 3885
     * thread num 6 duration = 3454
     * thread num 7 duration = 2713
     * thread num 8 duration = 3000
     * thread num 9 duration = 2504
     */
    //我们用伪共享(false sharing)的理论来分析一下. 后面的那个程序longs数组的4个元素,由于volatilelong只有1个长整型成员, 所以整个数组都将被加载至同一缓存行, 但有多个线程同时操作这条缓存行, 于是伪共享就悄悄地发生了.
    @contended  //-xx:-restrictcontended
    public final static class valuenopadding {
            protected volatile long value = 0l;
        }
}

当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享
xy 在同一缓存行
在核心1上运行的线程想更新变量x,同时核心2上的线程想要更新变量y。不幸的是,这两个变量在同一个缓存行中。每个线程都要去竞争缓存行的所有权来更新变量。如果核心1获得了所有权,缓存子系统将会使核心2中对应的缓存行失效。当核心2获得了所有权然后执行更新操作,核心1就要使自己对应的缓存行失效。这会来来回回的经过l3缓存,大大影响了性能。如果互相竞争的核心位于不同的插槽,就要额外横跨插槽连接,问题可能更加严重。

总结:伪共享在多核编程中很容易发生, 而且比较隐蔽. 例如, 在jdk的linkedblockingqueue中, 存在指向队列头的引用head和指向队列尾的引用last. 而这种队列经常在异步编程中使有,这两个引用的值经常的被不同的线程修改, 但它们却很可能在同一个缓存行, 于是就产生了伪共享. 线程越多, 核越多,对性能产生的负面效果就越大.

二 高性能队列

high performance inter-thread messaging library
概述-英

介绍disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。java的内置队列如下表所示。

disruptor


我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是arrayblockingqueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成linkedblockingqueue和concurrentlinkedqueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的linkedtransferqueue都是通过原子变量compare and swap(以下简称“cas”)这种不加锁的方式来实现的。

 

通过不加锁的方式实现的队列都是*的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有arrayblockingqueue。

cas操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。
单线程情况下,不加锁的性能 > cas操作的性能 > 加锁的性能。
在多线程情况下,为了保证线程安全,必须使用cas或锁,这种情况下,cas的性能超过锁的性能,前者大约是后者的8倍。

arrayblockingqueue有三个成员变量: - takeindex:需要被取走的元素下标 - putindex:可被元素插入的位置的下标 - count:队列中元素的数量

这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

disruptorarrayblockingqueue伪共享示意图

disruptor通过以下设计来解决队列速度慢的问题: - 环形数组结构

  1. 为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。 - 元素位置定位
  2. 数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万qps的处理速度,也需要30万年才能用完。 - 无锁设计
  3. 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

disruptor相对于传统方式的优点:

  1. 没有竞争=没有锁=非常快。
  2. 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
  3. 在每个对象中都能跟踪序列号(ring buffer,claim strategy,生产者和消费者),加上神奇的cache line padding,就意味着没有为伪共享和非预期的竞争。

一个生产者 直接申请内存写入m个元素 无cas 操作

多个生产者 如何防止多个线程重复写同一个元素 每个线程获取不同的一段数组空间进行操作。这个通过cas很容易达到。只需要在分配元素的时候,通过cas判断一下这段空间是否已经分配出去即可。
如何防止读取的时候,读到还未写的元素。disruptor在多个生产者的情况下,引入了一个与ring buffer大小相同的buffer:available buffer。当某个位置写入成功的时候,便把availble buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available buffer,来判断元素是否已经就绪。
下面分读数据和写数据两种情况介绍。
读数据
生产者多线程写入的情况会复杂很多: 1. 申请读取到序号n; 2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置; 3. 消费者读取元素。
如下图所示,读线程读到下标为2的元素,三个线程writer1/writer2/writer3正在向ringbuffer相应位置写数据,写线程被分配到的最大元素下标是11。
读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availablebuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是waitfor(11)返回6。
然后,消费者读取下标从3到6共计4个元素。

disruptor多个生产者情况下,消费者消费过程示意图

 

写数据
多个生产者写入的时候: 1. 申请写入m个元素; 2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间; 3. 生产者写入元素,写入元素的同时设置available buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,writer1和writer2两个线程写入数组,都申请可写的数组空间。writer1被分配了下标3到下表5的空间,writer2被分配了下标6到下标9的空间。
writer1写入下标3位置的元素,同时把available buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。writer2同样的方式。最终都写入完成。

disruptor多个生产者情况下,生产者生产过程示意图

 

防止不同生产者对同一段空间写入的代码,如下所示:

public long trynext(int n) throws insufficientcapacityexception
{
    if (n < 1)
    {
        throw new illegalargumentexception("n must be > 0");
    }
    long current;
    long next;
    do
    {
        current = cursor.get();
        next = current + n;
 
        if (!hasavailablecapacity(gatingsequences, n, current))
        {
            throw insufficientcapacityexception.instance;
        }
    }
    while (!cursor.compareandset(current, next));
    return next;
}

getting started

public class longeventmain8 {
    public static void handleevent(longevent event, long sequence, boolean endofbatch)
    {
        system.out.println(event);
    }

    public static void translate(longevent event, long sequence, bytebuffer buffer)
    {
        event.set(buffer.getlong(0));
    }

    public static void main(string[] args) throws exception
    {
        // specify the size of the ring buffer, must be power of 2.
        int buffersize = 1024;

        // construct the disruptor
        /**
         * ringbuffer生产工厂,初始化ringbuffer的时候使用
         * ringbuffer的大小
         * 采用单生产者模式
         *默认阻塞策略blockingwaitstrategy
         *
         */
        disruptor<longevent> disruptor = new disruptor<>(longevent::new, buffersize, daemonthreadfactory.instance);

        // 设置eventhandler
        disruptor.handleeventswith(longeventmain8::handleevent);

        // start the disruptor, starts all threads running
        disruptor.start();

        // get the ring buffer from the disruptor to be used for publishing.
        ringbuffer<longevent> ringbuffer = disruptor.getringbuffer();

        bytebuffer bb = bytebuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putlong(0, l);
            ringbuffer.publishevent(longeventmain8::translate, bb);
            thread.sleep(1000);
        }
    }
}

生产者的等待策略
暂时只有休眠1ns。 locksupport.parknanos(1);

消费者的等待策略 等待给定的sequence变为可用

disruptor

 


并发框架disruptor译文