欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

理解线程池的原理

程序员文章站 2024-03-02 14:42:10
...
  • LinkedBlockingQueue
  • DelayedQueue
  • 1.构造初始化DelayedQ
  • 2.offer插入元素
  • 3.take提取数组元素
  • 小结
  • ArrayBlockingQueue

    ArrayBlockingQueue是一个有界阻塞队列数据结构基于数组、使用ReentrantLock、Condition保证并发同步

    所谓阻塞队列
    当队列满了,则会对生产线程产生阻塞直到有空位可插入;
    当队列空了,则会对消费队列产生阻塞直到有新的元素被加入队列。

    理解线程池的原理

    方法中含有字母t的都会产生阻塞waiting;
    方法中含有o的都会返回 true/false;
    剩下add、remove的会抛出异常;
    peek()会从队列头部观察头结点,但并不会对队列造成影响。

    我们通过一个简单的应用,来逐步分析ArrayBlockingQueue队列的代码:

    public class ArrayBlockingQueueTest {
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService ex = Executors.newFixedThreadPool(50);
    
        ArrayBlockingQueue<CustomizedTask> tasksQueue = new ArrayBlockingQueue<CustomizedTask>(100);//有界队列 100个元素
        // 生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        tasksQueue.put(new CustomizedTask());
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    
        // 消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                CustomizedTask task;
                try {
                    while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) {
                        ex.submit(task);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    
        System.out.println("Main Thread is terminated");
    }
    
    static class CustomizedTask implements Runnable {
    
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis());
        }
    }
    

    }

    1.构造:

    /** The queued items */ final Object[] items;
    /** items index for next take, poll, peek or remove */
    int takeIndex;
    
    /** items index for next put, offer, or add */
    int putIndex;
    
    /** Number of elements in the queue */
    int count;
    
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    
    /** Main lock guarding all access */
    final ReentrantLock lock;
    
    /** Condition for waiting takes */
    private final Condition notEmpty;
    
    /** Condition for waiting puts */
    private final Condition notFull;
    
    
    
    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];//全局变量,一个Object[]数组用来维护入队元素
            lock = new ReentrantLock(fair);//ReentrantLock.Condition实现等待\通知
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    2.入队列。生产者生产消息并放入队列

    
        public void put(E e) throws InterruptedException {
            checkNotNull(e);//入队元素正确性判断
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();//获取锁
            try {
                while (count == items.length)//如果队列中数据已经达到队列上限
                    notFull.await();//阻塞并释放锁(此时当前线程进入Condition队列并产生park阻塞)
                enqueue(e);//当队列中有空位存在的时,执行入队
            } finally {
                lock.unlock();
            }
        }
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;//putIndex初始化为0,每次插入元素后递增
        if (++putIndex == items.length)//达到上限
            putIndex = 0;
        count++;//Number of elements in the queue
    //通知阻塞在队列上的消费者(AQS:在获取到锁的情况下,将阻塞在Condition队列的结点放入sync队列中,等待被唤醒再次尝试锁获取)
        notEmpty.signal();
    }
    

    3.出队列。消费者如果阻塞会被唤醒,并且进行锁获取和取队列元素

    
    	  public E take() throws InterruptedException {
    	        final ReentrantLock lock = this.lock;
    	        lock.lockInterruptibly();
    	        try {
    	            while (count == 0)//如果是个空队列
    	                notEmpty.await();//阻塞直到队列进入元素同时释放锁
    	            return dequeue();
    	        } finally {
    	            lock.unlock();
    	        }
    	    }
    
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//数组中取数
        items[takeIndex] = null;//取数后释放占用
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;//队列中总元素数目减1
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//唤醒阻塞的等待消费的线程
        return x;
    }
    

    LinkedBlockingQueue

    LinkedBlockingQueue是一个有界阻塞队列,基于链表结构实现,默认capacity为Integer.MAX_VALUE。
    我们通过一个简单的应用,来逐步分析LinkedBlockingQueue队列的代码:

    
    	public class LinkedBlockingQueueTest {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService ex = Executors.newFixedThreadPool(50);
    
            LinkedBlockingQueue<CustomizedTask> tasksQueue = new LinkedBlockingQueue<CustomizedTask>(100);
            // 生产者线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            tasksQueue.put(new CustomizedTask());
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
    
            // 消费者线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CustomizedTask task;
                    try {
                        while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) {
                            ex.submit(task);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            System.out.println("Main Thread is terminated");
        }
    
        static class CustomizedTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis());
            }
        }
    }
    

    1.初始化构造:

    
    		/** Current number of elements */
    		private final AtomicInteger count = new AtomicInteger();
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
     
         /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();
       
       /**
         * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity} is not greater than
         *             zero
         */
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);//构造链表的头尾结点,链表的初始化
        }
    

    1.1 链表数据结构

    	
    	    /**
    	     * Linked list node class
    	     * 一个简单的单向链表
    	     */
    	    static class Node<E> {
    	        E item;
    
            /**
             * One of: - the real successor Node - this Node, meaning the successor
             * is head.next - null, meaning there is no successor (this is the last
             * node)
             */
            Node&lt;E&gt; next;
    
            Node(E x) {
                item = x;
            }
        }
    

    2.入队列。生产者生产消息并放入队列

    		
    	      public void put(E e) throws InterruptedException {
            if (e == null)
                throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);//插入的对象包装为一个结点
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();//获取putLcok
            try {
                /*
                 * Note that count is used in wait guard even though it is not
                 * protected by lock. This works because count can only decrease at
                 * this point (all other puts are shut out by lock), and we (or some
                 * other waiting put) are signalled if it ever changes from
                 * capacity. Similarly for all other uses of count in other wait
                 * guards.
                 */
                while (count.get() == capacity) {//队列内元素达到上限
                    notFull.await();//condition等待
                }
                enqueue(node);//在队列不满的情况下 插入元素
                c = count.getAndIncrement();//容量计数
                if (c + 1 < capacity)//队列是否可以再插入一个元素
                    notFull.signal();//唤醒在 putLock.condition等待的线程,线程执行插入操作。
            } finally {
                putLock.unlock();
            }
            if (c == 0)//如果队列再进入这个操作之前是空的,那么现在不空了(刚插入一个元素),唤醒因为队列空而阻塞的取数线程
                signalNotEmpty();
        }
    
     private void enqueue(Node&lt;E&gt; node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;//尾部插入一个元素,并且把last引用指向这个元素
        }
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    

    3.出队列。消费者如果阻塞会被唤醒,并且进行锁获取和取队列元素

    
    	 	public E take() throws InterruptedException {
    	        E x;
    	        int c = -1;
    	        final AtomicInteger count = this.count;
    	        final ReentrantLock takeLock = this.takeLock;
    	        takeLock.lockInterruptibly();
    	        try {
    	            while (count.get() == 0) {//队列为空,则阻塞取操作直到队列不空
    	                notEmpty.await();
    	            }
    	            x = dequeue();
    	            c = count.getAndDecrement();
    	            if (c > 1)//如果进入这个操作之前队列中元素超过1个(比如2个),则表示这个操作取数后依旧不为空(起码还有1个),那么可以唤醒其他因为队列为空而阻塞的线程
    	                notEmpty.signal();
    	        } finally {
    	            takeLock.unlock();
    	        }
    	        //唤醒这个操作执行之前因为队列慢而产生的阻塞,起码这个操作之后会有一个空位
    	        if (c == capacity)
    	            signalNotFull();
    	        return x;
    	    }
    
    	 private E dequeue() {
    	        // assert takeLock.isHeldByCurrentThread();
    	        // assert head.item == null;
    	        Node&lt;E&gt; h = head;
    	        Node&lt;E&gt; first = h.next;//head的下个元素。可以看到是按照 FIFO队列排序获取的
    	        //将这个元素从队列中清除(出队)
    	        h.next = h; // help GC
    	        head = first;
    	        E x = first.item;
    	        first.item = null;
    	        return x;
    	    }
    
    	private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    

    DelayedQueue

    一个*的阻塞队列,其中的元素需要是先Delayed接口,对元素的提取加入了延期限制

    当元素的过期时间到了才允许从队列中取出。队列头部的元素是等待时间最久的元素。
    如果插入数据增加会自动扩容,创建新的更大的数组并将原数组数据放入(PriorityQueue)。
    如果没有元素到了过期时间,那么队列头head不存在,并且poll操作返回null。
    当一个元素到了过期时间,那么它的getDelay(TimeUnit.NANOSECONDS)方法将会返回一个小于0的数字。队列中不允许放入null元素。

    理解线程池的原理

    还是用一个Demo来入手源码的分析:

    public class DelayQueueTest {
    
    public static void main(String[] args) {
        DelayQueue&lt;DelayedElement&gt; delayQueue = new DelayQueue&lt;DelayedElement&gt;();
    
        producer(delayQueue);
        consumer(delayQueue);// Consumer 1
        consumer(delayQueue);// Consumer 2
    
    }
    
    /**
     * 每100毫秒创建一个对象,放入延迟队列,延迟时间1毫秒
     * @param delayQueue
     */
    private static void producer(final DelayQueue&lt;DelayedElement&gt; delayQueue) {
        // offer
        new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    i++;
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    DelayedElement element = new DelayedElement(1000 * 60 * 2, "test" + i);// 2min
                    System.out.println("offer success " + delayQueue.offer(element));
                }
            }
        },"Producer").start();
    
        /**
         * 每秒打印延迟队列中的对象个数
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("delayQueue size:" + delayQueue.size());
                }
            }
        },"Watcher").start();
    }
    
    /**
     * take
     * 
     * 消费者,从延迟队列中获得数据,进行处理
     * @param delayQueue
     */
    private static void consumer(final DelayQueue&lt;DelayedElement&gt; delayQueue) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    DelayedElement element = null;
                    try {
                        element = delayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(System.currentTimeMillis() + "---" + element);
                }
            }
        },"Consumer").start();
    }
    

    }

    class DelayedElement implements Delayed {

    private final long delay; // 延迟时间
    
    private final long expire; // 到期时间
    
    private final String msg; // 数据
    
    private final long now; // 创建时间
    
    public DelayedElement(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay; // 到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }
    /**
     * 需要实现的接口,获得延迟时间 用过期时间-当前时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    /**
     * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
    
    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delay);
        sb.append(", expire=").append(expire);
        sb.append(", msg='").append(msg).append('\'');
        sb.append(", now=").append(now);
        sb.append('}');
        return sb.toString();
    }
    

    }

    1.构造初始化DelayedQ

    
    	    private final transient ReentrantLock lock = new ReentrantLock();
    
        private final PriorityQueue&lt;E&gt; q = new PriorityQueue&lt;E&gt;();//内部通过一个PriorityQueue存储元素,而PriorityQueue内部通过数组实现。这个priority会自动通过移动数组元素进行扩容,类似ArrayList
    
    	private final Condition available = lock.newCondition();//同样是通过condition实现
    
    	    public DelayQueue() {
        }
    
    
    	/**
         * 线程被设计来用来等待队列头部的元素
         * 
         * 这是 leader-follower模式的变体,为了最大限度减小不必要的时间等待
         * 当一个线程成为 leader,它会等待直到头结点过期,而其他线程会无限期的等待下去,直到这个leader被释放并唤醒其他线程。
         * leader 线程必须在从take()或者poll()等其他方法中返回前,通知**其他线程,并释放leader引用
         * 
         * 无论什么时候头结点被替换了一个更早过期的时间。
         * 这个leader field 通过设置为null,被置为无效。
         * 其他线程被唤醒然后准备获取到接着释放leadship。
         * 
         */
        private Thread leader = null;
    

    2.offer插入元素

    
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);//队尾插入
                if (q.peek() == e) {//队列中仅有一个元素
                    leader = null;
                    available.signal();//可能存在其他线程因为队列控而阻塞
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
    

    3.take提取数组元素

    
        /**
         * Retrieves and removes the head of this queue, waiting if necessary until
         * an element with an expired delay is available on this queue.
         *
         * @return the head of this queue
         * @throws InterruptedException {@inheritDoc}
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();//查看队列中的头元素
                    if (first == null)//为null表示没有可获取的元素
                        available.await();//condition await
                    else {
                        long delay = first.getDelay(NANOSECONDS);//查看这个元数据的过期时间
                        if (delay <= 0)//已过期 可获取
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();//如果不是leader则进入等待状态,直到之前的leader被释放后被唤醒
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;//当前获取队列元素的线程
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;//线程获取到元素后释放leader引用
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)//leader已被释放 && 下个结点存在
                    available.signal();//leader线程获取了元素 并且释放了leader引用,退出方法前唤醒其他线程。
                lock.unlock();
            }
        }
    

    小结

    加上之前对ArrayBlockingQueueLinkedBlockingQueue的介绍,阻塞队列常用类型基本介绍完了,下边对其他阻塞队列做个简介。

    SynchronousQueue:
    这个队列不存储元素,当一个线程向这个队列插入一个元素,另一个队列需要立刻从这个队列里取出,否则无法继续插入元素。适合传递型场景。

    LinkedTransferQueue:
    一个由链表构成的*阻塞队列

    LinkedBlockingDeque
    一个链表结构的 双向阻塞队列。可以满足两个线程分别从头尾进行插入或移除操作,应用于“工作窃取”算法:允许一个线程从头部插入\移除元素,另一个窃取线程从尾部窃取元素。

    各种IT书籍书目及下载链接
    https://blog.csdn.net/dh1027/article/details/89327978