欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发

程序员文章站 2024-02-20 11:21:29
...

线程池

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize:当有新任务时,如果线程池中线程数没有达到线程池的基本大小,则会创建新的线程执行任务,否则将任务放入阻塞队列。
  • maximumPoolSize:当阻塞队列填满时,如果线程池中线程数没有超过最大线程数,则会创建新的线程运行任务。否则根据拒绝策略处理新任务。
  • BlockingQueue:存储等待运行的任务。
  • keepAliveTime:线程空闲后,保持存活的时间。
  • TimeUnit:时间单位
TimeUnit.DAYS
TimeUnit.HOURS
TimeUnit.MINUTES
TimeUnit.SECONDS
TimeUnit.MILLISECONDS
TimeUnit.MICROSECONDS
TimeUnit.NANOSECONDS
  • ThreadFactory:通过线程工厂可以给创建的线程设置名字。
  • RejectedExecutionHandler:当队列和线程池都满了时,根据拒绝策略处理新任务。
AbortPolicy:默认的策略,直接抛出RejectedExecutionException
DiscardPolicy:不处理,直接丢弃
DiscardOldestPolicy:将等待队列队首的任务丢弃,被拒绝的任务重新添加到等待队列
CallerRunsPolicy:用主线程运行任务

executor框架

1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。当提交一个Callable对象给ExecutorService,将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。Executor框架的内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

executor框架由3部分组成:任务、任务的执行、异步计算的结果

  • 任务。包括被执行任务需要实现的接口:Runnable和Callable接口。
  • 任务的执行。Executor接口是Executor框架的基础,它将任务的提交和执行分离开。ExecutorService接口继承于Executor,有两个实现类ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  • 异步计算的结果。包括future接口和实现future接口的FutureTask。

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor用于在一定延迟时间后运行命令或定期执行命令。

Runnable和Callable的区别:当我们把Runnable或Callable的实现类submit给ThreadPoolExecutor或ScheduledPoolExecutor执行时,Runnable不会返回结果,Callable可以返回结果(FutureTask对象)。

Executors.callable(Runnable task);
ExecutorService.execute(Runnable);
ExecutorService.submit(Runnable/Callable);

工具类Executors可以创建三种类型的ThreadPoolExecutor。

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

固定线程数的线程池。使用*队列,运行中的线程池不会拒绝任务,即不会调用RejectedExecutionHandler.rejectedExecution()方法。

public static ExecutionService newSingleThreadExecutor() {
	return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

使用*队列。线程池只有一个运行的线程,新来的任务放入工作队列,线程处理完任务就循环从队列里获取任务执行。

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

根据需要创建新线程的线程池。使用没有容量的SynchronousQueue作为线程池工作队列,当任务提交的速度快于线程处理任务的速度时,CachedThreadPool会不断创建新线程。

ReentrantLock和synchronized

采用synchronized关键字实现同步,线程执行完同步代码块会自动释放锁,而ReentrantLock需要手动释放锁。
synchronized是非公平锁,ReentrantLock可以设置为公平锁。
ReentrantLock是等待可中断的,持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待。而synchonized会无限期等待下去。

join和yeild

main是主线程,在main中创建了thread线程,在main中调用了thread.join(),main线程放弃cpu控制权,thread线程执行完才继续执行main线程。
当一个线程使用了yield()方法之后,它就会把自己CPU执行的时间让掉,让自己或者其它的线程运行。yield()方法只能让同优先级的线程有执行的机会。

等待/通知机制

调用wait/notify需要先获得对象的锁
调用wait之后线程释放锁,将线程放到对象的等待队列,当通知线程调用notify后,等待线程并不会立即从wait返回,得等通知线程释放锁,从wait方法返回前提是线程获得锁
等待通知机制依托于同步机制,目的是确保等待线程从wait方法返回时能感知到通知线程对对象的变量值的修改

创建线程的方法

  • 通过扩展Thread类来创建多线程
  • 通过实现Runnable接口来创建多线程,可实现线程间的资源共享
  • 实现Callable接口,通过FutureTask接口创建线程。
  • 使用Executor框架来创建线程池。
public class CallalbleTest {
    public static void main(String[] args) {
        CallableDemo cd = new CallableDemo();

        //异步计算的结果
        FutureTask<Integer> result = new FutureTask<>(cd);

        new Thread(result).start();

        try {
            //等待任务完成,返回结果
            int sum = result.get();
            System.out.println(sum);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

}
class CallableDemo implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        int sum = 0;

        for (int i = 0; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}
public class ExecutorsDemo {
    public static void main(String[] args) {
        //获取ExecutorService实例
        ExecutorService executorService = Executors.newCachedThreadPool();
        //提交任务
        executorService.submit(new RunnableDemo());
    }
}

class RunnableDemo implements Runnable {

    @Override
    public void run() {
        System.out.println("Tyson");
    }
}

实现Runnable接口比继承Thread类所具有的优势:
1):资源共享,适合多个相同的程序代码的线程去处理同一个资源
2):可以避免java中的单继承的限制
3):线程池只能放入实现Runable或callable类线程,不能直接放入继承Thread的类

ConcurrentHashMap

多线程环境下,使用Hashmap进行put操作会引起死循环。
CocurrentHashMap利用锁分段技术增加了锁的数目,从而使争夺同一把锁的线程的数目得到控制。
锁分段技术就是将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap调用get的时候不加锁,原因是node数组成员val和指针next是用volatile修饰的,更改后的值会立刻刷新到主存中,保证了可见性,node数组也用volatile修饰,可以保证扩容时对其他线程具有可见性。

JDK1.7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计size时,比较统计前后modCount是否发生变化。如果没有变化,则直接返回size。否则,需要依次锁住所有的Segment来计算。jdk1.7中ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能。

jdk1.8不采用segment而采用Node,锁住Node来实现减小锁粒度。当链表长度过长时,Node会转换成TreeNode。

这个put的过程很清晰,对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。
如果没有初始化就先调用initTable()方法来进行初始化过程
如果没有hash冲突就直接CAS插入
如果还在进行扩容操作就先进行扩容
如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

volatile

当一个变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。
volatile关键字的两层语义
1)保证了不同线程对共享变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
2)禁止进行指令重排序。

Java锁的实现

  • synchronized:重量级锁,隐式支持重进入。
  • ReentrantLock:可重入锁,支持同一个线程对资源的重复加锁,可以设置公平和非公平。通过组合自定义队列同步器AbstractQueueSynchronizer实现。同步状态为0时表示锁没被占用。
  • ReentrantReadWriteLock:读写锁,同一时刻可以允许多个读线程访问,但在写线程访问时,所有读线程和写线程会别堵塞。通过自定义队列同步器实现同步功能。将状态变量按位分割,高16位表示读,低16位表示写。

锁的状态

锁主要存在四中状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,他们会随着竞争的激烈而逐渐升级。锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率。

  • 偏向锁:当线程访问同步块并获取锁时,会在对象头和锁记录中存储锁偏向的线程id,以后该线程进入和退出同步块时,只需简单测试一下对象头的mark word中是否存储着指向当前线程的偏向锁,如果测试成功,则线程获取锁成功,否则,需再测试一下mark word中偏向锁标识是否是1,是的话则使用CAS操作竞争锁。如果竞争成功,则将Mark Word中线程ID设置为当前线程ID,如果CAS获取偏向锁失败,则表示有竞争。当到达全局安全点时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码。
    偏向锁使用了一种等到竞争出现才释放锁的机制。只有当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。
    适用场景:在锁无竞争的情况下使用,在线程没有执行完同步代码之前,没有其它线程去竞争锁,一旦有了竞争就升级为轻量级锁,升级为轻量级锁的时候需要撤销偏向锁,会做很多额外操作,导致性能下降。

  • 轻量级锁
    加锁过程:线程执行同步块之前,JVM会先在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头的mark word复制到锁记录(displaced mark word)中,然后线程尝试使用cas将对象头的mark word替换为指向锁记录的指针。如果成功,则当前线程获得锁,否则表示有其他线程竞争锁,当前线程便尝试使用自旋来获得锁。
    解锁过程:使用原子的cas操作将displaced mark word替换回到对象头,如果成功则解锁成功,否则表明有锁竞争,锁会膨胀成重量级锁。

  • 重量级锁:当一个线程获取到锁时,其他线程都会被阻塞住,当持有锁的线程释放锁之后会唤醒这些线程,被唤醒的线程才有机会获取到锁。
    synchronized通过对象内部的监视器(monitor)实现,每个对象都有一个monitor,当对象的monitor被持有时,则它处于锁定的状态。代码块的同步是使用monitorenter和monitorexit指令实现的,monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处或异常处。
    synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中,保证了可见性。

  • 自旋锁:就是让该线程等待一段时间,执行一段无意义的循环,不会被立即挂起,看持有锁的线程是否会很快释放锁。如果持有锁的线程很快就释放了锁,那么自旋的效率就非常好,反之,自旋的线程就会白白消耗掉处理的资源,它不会做任何有意义的工作,这样反而会带来性能上的浪费。所以自旋等待的时间(自旋的次数)必须要有一个限度,如果自旋超过了定义的时间仍然没有获取到锁,则应该被挂起。

JDK 1.6引入了更加聪明的自旋锁,即自适应自旋锁。所谓自适应就意味着自旋的次数不再是固定的,它是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。

Daemon Thread

在Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程)
只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。
Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是GC
将线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。

并发工具

  1. CountDownLatch允许一个或多个线程等待其他线程完成操作。常见的应用场景是开启多个线程同时执行某个任务,等到所有任务执行完再汇总统计结果。
  2. CyclicBarrier(同步屏障),让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开。
  3. Semaphore类似于锁,它用于控制同时访问特定资源的线程数量,控制并发线程数。

CyclicBarrier 和 CountDownLatch 都能够实现线程之间的等待。
CountDownLatch用于某个线程等待其他线程执行完任务再执行。
CyclicBarrier用于一组线程互相等待到某个状态,然后这组线程再同时执行。
CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

public class CountDownLatchDemo {
    static final int N = 8;
    static CountDownLatch latch = new CountDownLatch(N);

    public static void main(String[] args) throws InterruptedException {

       for(int i = 0; i < N; i++) {
            new Thread(new Thread1()).start();
       }

       latch.await(1000, TimeUnit.MILLISECONDS);
       System.out.println("task finished");
    }

    static class Thread1 implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "starts working");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown();
            }
        }
    }
}
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        final int N = 5;
        CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " running now.");
            }
        });

        for(int i = 0; i < N; i++) {
            new Thread1(barrier).start();
        }
    }

    static class Thread1 extends Thread {
        private CyclicBarrier barrier;

        public Thread1(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " running");
            try {
                Thread.sleep(50000);
                System.out.println(Thread.currentThread().getName() + " finished");
                barrier.await(2000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}
public class SemaphoreDemo {
    public static void main(String[] args) {
        final int N = 15;
        Semaphore s = new Semaphore(3);
        for(int i = 0; i < N; i++) {
            new Worker(s, i).start();
        }
    }

    static class Worker extends Thread {
        private Semaphore s;
        private int num;
        public Worker(Semaphore s, int num) {
            this.s = s;
            this.num = num;
        }

        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("worker" + num +  " using the machine");
                Thread.sleep(1000);
                System.out.println("worker" + num +  " finished the task");
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

并发的条件

并发程序要正确地执行,必须要保证原子性、可见性以及有序性。

Copy-On-Write

写时复制。当我们往容器添加元素时,不直接往容器添加,而是先将当前容器进行复制,复制出一个新的容器,然后往新的容器添加元素,添加完元素之后,再将原容器的引用指向新容器。这样做的好处就是可以对Copy-On-Write容器进行并发的读而不需要加锁,因为当前容器不会被修改。

从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。

CopyOnWriteArrayList中add方法添加的时候是需要加锁的。读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,还是可以读到旧的数据。

缺点:

  • 内存占用问题。由于CopyOnWrite的写时复制机制,在进行写操作的时候,内存里会同时驻扎两个对象的内存。
  • 旧的对象和新写入的对象数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。
相关标签: 并发