第一章 - 并发系统设计准则
并发系统中可能遇到的问题
Data race
一个线程写入的数据可能被另一个线程所覆盖
死锁
如果两个或多个线程同时等待共享资源,而这些共享资源又必须由彼此来释放。以下四个条件同时满足时就会造成死锁:
- 互斥性:死锁涉及的资源一次只允许一个线程使用
- 持有资源并等待:一个线程持有一个互斥的资源,但同时等待另一个互斥的资源。等待的同时不释放已持有的资源
- 没有剥夺资源权力:被占有的资源只能由占有它的线程释放
- 循环等待:几个线程同时等待被其它线程占有的资源
以下机制可以预防死锁发生:
- 忽略死锁:这也是最通俗的机制。假设死锁在你系统中永远不会发生。如果死锁发生,手动终止应用程序并重新启动
- 检测:系统有一个特殊线程来分析系统状态,一旦发现死锁,终止其中一个线程来强迫其释放已占有的资源
- 预防:在程序中防止以上四个死锁条件的一个或多个条件成立
- 避免:开始执行一个线程时可以分析它所需要的资源是否可用以决定是否启动该线程。
活锁
活锁发生在当系统中有两个线程因为对方的某种操作或行为不断改变自己的状态。其结果是它们都处于改变状态的循环中而无法继续执行。
例如有两个任务,任务1和任务2,它们同时需要两个资源:资源1和资源2。假设任务1拥有资源1,任务2拥有资源2。它们都无法获得所需的全部资源,它们释放了各自的资源然后开始新一轮的资源持有,释放循环。这种状态会一直持续,因此这两个任务永远没法终止。
资源饥饿
资源饥饿发生在当系统中的一个任务永远无法获得所需的资源时。当一个资源可用时但同时有多个任务正在等待该资源,如果分配资源的算法不够优化,有可能一个任务永远无法分配到资源。一种解决办法是考虑公平性,资源分配算法在决定把资源分配给哪个等待的任务时考虑每个任务已等待该资源的时间。然而实现公平性可能导致额外的开销从而降低系统运行效率。
优先权倒置
优先级倒置发生在当一个优先级较低的任务持有另一个优先级较高任务所需的资源时,优先级较低的任务会先于优先级高的任务被执行。
设计并发算法的方法
从串行版本的算法开始
当开始实现一个算法时,我们可以从串行算法入手,这样做的好处是:
- 可以利用串行算法来测试并行算法的输出是一致的。
- 可以用串行算法和并行算法的运行时间做比较,从而得到并验证并行算法提高的效率
第一步 - 分析
通过分析串行版本的算法来决定代码中的哪一部分可以用并行方法代替。需要特别关注那些运行时间较长的代码。代码中相互独立的循环部分是很有可能被改为并行处理的。例如一个系统需要初始化数据库连接池,加载配置文件,初始化一些对象。这些都是相互独立的步骤,可以用并行方法处理。
第二步 - 设计
当你知道哪些代码可以被修改为并发处理,你就需要决定如何实现它。代码的修改一般会影响系统中以下两个主要部分:
- 代码的结构
- 数据结构的组织
设计并行算法的方法有以下两种:
- 任务分解:当程序中的一部分代码能被拆分为两个或两个以上独立的任务,并且它们能被同时执行时,你可以采用任务分解法。如果有些任务需要在另外一些任务之前被执行,那么你需要引入一些同步机制。
- 数据分解:如果一个很大的数据可以被拆分为几个数据子集,由多个相同线程来处理时,我们可以采用数据分解法。每个线程处理不同的数据子集。因为此时这个数据是多个线程的共享数据,如果线程需要修改数据时,你需要使用同步机制来保护这个共享数据。
并发处理的目的是为了提高处理的效率,所以你必须使用计算机所有可用的处理器或核。另一方面,当你使用同步机制时,你需要引入一些额外的必须执行的代码。一旦你把算法分解为许许多多细小的任务时,那些由于同步机制引入的额外代码有可能影响程序的运行效率。但如果你分解的任务数少于处理器或核的数量,程序则没有最大化地利用硬件资源。同时,你还需要考虑每一个线程需要做的工作量,如果一个线程的工作量大于其它的线程,这个线程就会决定程序的整体运行时间。因此你需要寻找一个相对的平衡点。
第三步 - 实现
第四步 - 测试
可以对比并行和串行两个版本的算法运行的结果以及所需的时间
第五步 - 优化
如果并行算法没有达到预期,则需要重新审核代码找出影响性能的部分再做调整。
JAVA Concurrency API
实现并行处理的基础类
- Thread类
- Runnable接口
- ThreadLocal类:此类用于保存线程的本地变量,即它的值不会被其它线程所影响
- ThreadFactory接口:基础的工厂设计模式,用于创建自定义的线程
同步机制
Java中最重要的同步机制:
- synchronized 关键字
- Lock 接口:Lock提供了更加灵活的同步操作。有几种不同的锁:ReentrantLock, 能够关联一个条件的锁; ReentrantReadWriteLock, 此锁区分了读和写操作; StampedLock,Java 8 的新特性,此锁包含三种模式用于控制读或写操作
- Semaphore 类
- CountDownLatch 类:此类允许主线程等待直到所有子线程完成任务
- CyclicBarrier 类:此类允许子线程在某个点进入等待状态,直到所有的子线程都到达等待状态点,然后继续往下执行
- Phaser类:此类允许你控制能被划分为不同阶段的一组任务的执行。没有一个任务能进入到下一个阶段,直到所有的任务都完成了当前阶段。
Executors
Executor框架允许你把线程创建和管理并发任务执行区分开来。它让你只需要关心线程的创建。该框架包括以下主要的类:
- Executor 和 ExecutorService接口:包含所有executors的公共方法
- ThreadPoolExecutor:通过此类你可以获得一个拥有线程池的executor,你还可以自定义线程池里的线程数量
- ScheduledThreadPoolExecutor:该 executor 允许你延迟或重复性执行任务
- Executors:该类提供了不同类型的 executor 的创建方法
- Callable接口:类似于Runnable接口,但是它允许有返回值
- Future接口:该接口包含了一些能够获取Callable接口返回的值或控制它状态的方法
Fork/Join 框架
Fork/Join 框架定义了一种特殊的executor,专门用于解决 divide and conquer 技术中遇到的问题。Fork/Join 专门针对细颗粒度的并行处理,它能够往任务队列中添加新的任务,同时也能从任务队列中取任务来执行。框架包含的主要的类和接口有:
- ForkJoinPool:此类实现了用于执行任务的 executor
- ForkJoinTask:定义了一个能够被ForkJoinPool执行的任务
- ForkJoinWorkerThread:这是一个能够执行 ForkJoinPool 里任务的线程
支持并行处理的数据结构
大体可以把这些数据结构分为两大类:
- 阻塞数据结构:一定条件下阻塞访问此数据结构的线程。例如一个线程要从该数据结构获取一个数据但该数据结构为空
- 非阻塞数据结构:不阻塞访问数据结构的线程。如果没有数据,则直接返回空值或者抛出异常 (具体行为取决于你调用哪个方法)
以下是其中一些数据结构:
- ConcurrentLinkedDeque: 非阻塞
- ConcurrentLinkedQueue:非阻塞
- LinkedBlockingDeque:阻塞
- LinkedBlockingQueue: 阻塞
- PriorityBlockingQueue:阻塞,根据优先级规则来排序其中的元素
- ConcurrentSkipListMap:非阻塞
- ConcurrentHashMap:非阻塞
- AtomicBollean, AtomicInteger, AtomicLong, and AtomicReference:Java基本数据类型对应的实现原子性的类(atomic implementations)
并发设计模式(Concurrency design patterns)
如同普通的设计模式,并发系统也有自己的设计模式:
Signaling
一个线程在一定时间通知另外一个线程。最简单实现该设计模式是用 ReentrantLock 或者 Semaphore 类。示例代码:
public void task1() { section1(); commonObject.notify(); } public void task2() { commonObject.wait(); section2(); }
以上代码中,section2()方法总是在section1()方法后被执行。
Rendezvous
这种设计模式如同Signaling,线程1等待线程2触发某一事件,线程2同时等待线程1触发某一事件。
public void task1() { section1_1(); commonObject1.notify(); commonObject2.wait(); section1_2(); } public void task2() { section2_1(); commonObject2.notify(); commonObject1.wait(); section2_2(); }
Mutex
Mutex实现一段关键代码的互斥,即一次只有一个线程允许访问此关键代码。在Java中,你可以使用 synchronized 关键字(可以用于保护一部分代码或整个方法),ReentrantLock类,或者Semaphore类。
public void task() { preCriticalSection(); lockObject.lock() // The critical section begins criticalSection(); lockObject.unlock(); // The critical section ends postCriticalSection(); }
Multiplex
这种设计模式如同Mutex,不同点是Mutex一次只允许一个线程访问关键资源,而Multiplex允许多个线程同时访问。例如当你有多个不同拷贝的资源可用时,多个线程就可以被允许同时访问。在Java里可以用Semaphore实现。
public void task() { preCriticalSection(); semaphoreObject.acquire(); criticalSection(); semaphoreObject.release(); postCriticalSection(); }
Barrier
这种设计模式用来实现当你需要在一个公共点来同步多个线程时。线程到达同步点时必须等待,直到参与的所有线程都到达那个同步点时才能继续。Java使用CyclicBarrier类来实现。
public void task() { preSyncPoint(); barrierObject.await(); postSyncPoint(); }
Read-write Lock
读写锁设计模式是为了提高系统的性能,读写锁具有以下特性:
- 如果一个线程正在做读操作,另外一个线程同时需要读操作,那么将被允许
- 如果一个线程正在做读操作,另外一个线程需要写操作,那么写操作线程将被阻塞直到读操作线程结束
- 如果一个线程正在做写操作,另外一个线程需要读操作,那么读操作线程将被阻塞直到写操作线程结束
Java中使用ReentrantReadWriteLock来实现该设计模式。但你需要小心读操作线程和写操作线程的优先级,如果有太多读操作的线程,那么写操作的线程将需要长时间等待。
Thread Pool
线程池由一组线程以及一个任务队列组成。通常线程池中线程的数量是固定的。如果任务队列中有任务需要处理,线程就会取任务执行,否则就等待直到有新的任务添加到任务队列中。Java并发包中包含了一系列实现了ExecutorService接口的类。
Thread Local Storage
该设计模式定义了如何使用全局变量或线程本地静态变量。当你使用线程本地存储时,每个线程将访问不同的变量。Java使用 ThreadLocal 类来实现这种模式。
Java的内存模型
当并发程序运行在多核或多处理器的计算机中。内存缓存可能会带来一些问题。内存缓存在一定程度上提高了运行效率,但是也带来了数据不一致性的问题。当一个线程修改了一个变量的值,它是修改了内存缓存中的变量值,而不是主内存里的变量值,这可能导致其它线程读取到的值是主内存里修改之前的旧值。
另外一个问题是编译器或代码优化器有可能对某些代码进行重新排序,以达到更优的运行效率,这在串行程序里是不会有问题的,但对并行系统来说,可能带来不可预见的运行结果。
为了解决这些问题,各个编程语言都引入了各自的内存模型。内存模型定义了一个任务修改了一个值后如何通知其它任务。Java的内存模型定义了以下几点:
- 关键字 volatile, synchronized 以及 final 的行为
- 确保了一个正确同步的并发程序能顺利运行在所有的计算机架构中
- 当一个线程获得了monitor,那么内存缓存将无效
- 当一个线程释放了monitor,那么缓存中的数据将被写入主内存
- 内存模型对程序员来说是透明的
设计并行算法时的一些建议
找出哪些是真正独立的任务
你只能对那些相互独立的任务作并行处理。例如一个循环中,下一个循环需要上一个循环得出的数据,那你就不能使用并发处理该循环,因为任务之间是相互依赖的。
避免使用一些底层的类来实现并发系统
你能够使用底层的类来实现并发系统,但是应该尽量避免使用它们。Java提供了很多高级类使程序员更好地进行并发编程。例如你可以使用 Thread 或者 Lock 类来实现线程的创建和线程之间的同步。但是Java还提供了更方便的类诸如 executors 或者 Fork/Join 框架。
考虑程序的可扩展性
程序可能需要在不同的计算机中运行,因此必须考虑程序的可扩展性。
当你使用数据分解法设计并发系统时,不要假设运行程序的计算机核与处理器的数量。因为这些都是可变因素。你需要动态获取这些信息,例如使用Java的 Runtime.getRuntime().availableProcessors()来动态获取这些信息然后计算线程池中所需的线程数。虽然这种方法会有额外的开销,但是它让你的程序具有更强的可扩展性。
当你使用任务分解法设计并发系统时,考虑的问题将会更多。取决于算法中独立任务的数量,强制性增加任务的数量可能因为同步机制导致更多的开销。程序整体运行性能也许更加糟糕。认真分析算法确定你是否能够有一个动态的任务数量。
使用线程安全的API
当需要使用Java类库时,阅读文档确定使用的类是否是线程安全,如果不是则需要:
- 用线程安全的类替代
- 如果没有,需要引入同步机制来保证线程安全
例如当你需要在并行系统中使用 List 时,你不能使用ArrayList,因为它不是线程安全的,你需要使用诸如ConcurrentLinkedDeque,CopyOnWriteArrayList 或者 LinkedBlockingDeque。
安排锁的顺序来避免死锁的发生
如果几个线程同时需要使用相同的资源,那么代码中获取这些资源的顺序应该一致,例如任务1和任务2同时需要占有资源1和资源2以完成操作,那么这两个任务都必须按照相同的顺序获取这两个资源,比如先获取资源1,后获取资源2,如果顺序不同,则有可能造成死锁。
如果可能,使用原子变量(atomic variables),而不是使用同步机制
有些时候,你应该使用volatile,而不是同步机制。例如只有一个线程修改变量,其它线程都是读这个变量,你应该选择使用volatile。其它情况你需要使用锁,synchronized关键字,或者synchronization方法。
Java 5 开始出现了一些原子变量,这些变量是一些支持单个变量原子性操作的类。它们引入了一个先对比后设值的机制。如果变量的值等于旧的值,那么则更新变量值并返回 true,否则返回 false,该类型变量的更新不需要锁或者同步机制,因此性能优于任何的同步机制,Java 提供了以下原子变量:
- AtomicInteger
- AtomicLong
- AtomicReference
- AtomicBoolean
- LongAdder
- DoubleAdder
尽可能短时间地持有锁
锁允许实现一次只有一个线程访问锁控制的代码,所以锁机制控制的代码要尽可能小,它应该只包含一些共享数据的操作。而且锁机制控制的代码里不应该有一些不可控的代码,例如在代码里调用Callable,因为Callable里或许有一些阻碍当前线程的操作,例如IO操作