欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java 并发之Concurrent 包综述

程序员文章站 2023-04-07 08:48:26
■ 并发原理 单核系统:线程交替执行,由于交替又快又多,给人一种同时执行的感觉多核系统:不仅可以交替执行线程,而且可以重叠执行线程补充: 本章指的并发主要指的是线程间的并发 ■ 常见的并发机制 ■ 不同系统的并发机制 UNIX:管道、消息、共享内存、信号量、信号 Linux内核:原子操作、自旋锁、信 ......

■ 并发原理

单核系统:线程交替执行,由于交替又快又多,给人一种同时执行的感觉
多核系统:不仅可以交替执行线程,而且可以重叠执行线程
补充: 本章指的并发主要指的是线程间的并发

 

■ 常见的并发机制

Java 并发之Concurrent 包综述

 

■ 不同系统的并发机制

  • unix:管道、消息、共享内存、信号量、信号
  • linux内核:原子操作、自旋锁、信号量、屏障(由于服务器一般都位于linux服务器上,因此此是我们最重要要了解的)
  • solaris线程同步原语:互斥锁、信号量、多读者/单写者锁、条件变量 
  • windows:等待函数、分派器对象、临界区、轻量级读写锁和条件变量

 ■ 互斥的需求

  • 强制互斥: 当临界区共享时,一次只允许一个线程进入临界区,即必须强制实施互斥
  • 禁止干涉: 一个在非临界区停止的线程不能干涉其他线程,包括临界区和非临界区的线程
  • 禁止无限延迟: 决不允许出现需要访问临界区的线程被无限延迟的情况,如产生死锁或饥饿
  • 可用立入: 当没有线程在临界区中时,任何需要进入临界区的线程必须能够立即进入
  • 核数无关: 对相关线程的执行速度和处理器的数目没有任何要求和限制
  • 有限时间: 一个线程驻留在临界区的时间必须是有限的

 ■ 互斥的方案

  • 硬件支持:处理器原生支持的互斥指令,好处是可以减少开销,但很难成为一个通用的解决方案
  • 系统或语言级别支持:即由操作系统或程序语言提供该级别的互斥支持,比如信号量、管程、消息传递等
  • 软件方法支持:这些方法通常基于在访问内存时基本互斥的假设,尽管允许访问的顺序事先没有具体安排,但同时访问内存中的同一地址的操作被内存仲裁器串行化执行了,即可以理解用算法的方式解决互斥问题,比如dekker算法、peterson算法

      * dekker算法

/**
 * dekker算法基本约束:
 *      某一时刻对某一内存地只能进行一次访问
 * 1.设置flag做为两个线程进入临界区的密钥,当一个线程失败,其他仍可访问
 *      - 每个线程只能改变自身的flag,只能检查其他线程的flag而不能改变
 *      - 当一个线程要进入临界区,需周期性检查另一个线程flag,直到另一线程不在临界区
 *      - 当线程进入临界区时,应立即设置自身flag为true,表明占领临界区
 *      - 当线程离开临界区时,应立即设置自身flag为false,表明释放临界区
 * 2.设置turn用于安排临界区的访问顺序,访问线程必须重复读取turn值直到被允许进入临界区
 *      - 当turn值等于线程号,该线程可以进入其临界区
 *      - 否则,该线程必须被强制等待(忙等待或自旋等待)
 */
public class dekker {
    //观察两个线程的状态
    boolean[] flag = {false,false};
    //表示临界区访问权限的轮转,初始权利给p1 -- 安排执行顺序避免谦让造成的活锁问题
    int turn = 1;
    public void p0(){
        while (true){
            //设置p0的flag为true,同时检查p1的flag
            flag[0] = true;
            while (flag[1]){
                //当临界区不可用时,判断当前临界区权限是否是p1
                if (turn == 1){ // 用于处理活锁问题
                    //当临界区权限是p1时,需要将p0设置为false,使得p1能进入临界区,用于处理死锁问题
                    flag[0] = false;
                    //循环校验turn的权限(空自旋),直到p1执行完毕将权限交给p0
                    while (turn == 1){
                        /** do nothing 空自旋 **/
                    }
                    flag[0] = true;//此时p1应已执行完毕,应当禁止p1进入临界区
                }
            }
            //当p1的flag为false时,p0可以立即进入临界区
            /** critical section 临界区 **/
            //当临界区执行完之后,turn设置为1,将临界区访问权限交换给p1
            //并将p0的flag设置为false,释放临界区,使得p1可进入临界区
            turn = 1;
            flag[0] = false;
            /** do otherthings **/
        }
    }
    public void p1(){
        while (true){
            //设置p1的flag为true,同时检查p0的flag
            flag[1] = true;
            while (flag[0]){
                //当临界区不可用时,判断当前临界区权限是否是p0
                if (turn == 0){ //用于处理活锁问题
                    //当临界区权限是p0时,需要将p1设置为false,使得p0能进入临界区,用于处理死锁问题
                    flag[1] = false;
                    //循环校验turn的权限(空自旋),直到p0执行完毕将权限交给p1
                    while (turn == 0){
                        /** do nothing 空自旋 **/
                    }
                    flag[1] = true;//此时p0应已执行完毕,应当禁止p0进入临界区
                }
            }
            //当p0的flag为false时,p1可以立即进入临界区
            /** critical section 临界区 **/
            //当临界区执行完之后,turn设置为0,将临界区访问权限交换给p0
            //同时将p1的flag设置为false,释放临界区,使得p0可进入临界区
            turn = 0;
            flag[1] = false;
            /** do otherthings **/
        }
    }
    public static void main(){
        /** 并发执行p0 p1 读者有兴趣可自己验证一下**/
    }
}

      * peterson算法

/**
 * peterson算法比dekker算法更加简单出色而且很容易推广到多个线程
 *     1.互斥保护验证:p0角度
 *      - 当po设置flag[0]=true,则p1不能进入临界区
 *      - 当p1已进入临界区,而flag[1]=true,p0不能进入临界区
 *     2.避免相互阻塞验证:p0角度
 *      - 当p0在while循环中被阻塞,此时flag[1]=true且turn=1
 *      - 当flag[1]=false或turn=0,此时p0可以进入临界区
 *     3.复杂度:该算法用简单的交替进入临界区的方式降低了并发互斥的复杂度
 */
public class peterson {
    boolean[] flag = {false,false};//表明每个互斥线程的位置
    int turn = 0;//解决同时发生的冲突
    public void p0(){
        while (true){
            flag[0] = true;
            //每次都要显式设置turn=1并作为while空自旋条件,迫使其他线程也有进入临界区的机会
            //这也是解决互斥的一个简洁方案,大家依次来,不能重复独占
            turn = 1;
            while (flag[1] && turn == 1){
                /** do nothing 空自旋**/
            }
            /** critical section 临界区 **/
            flag [0] = false;
            /** do otherthings **/
        }
    }
    public void p1(){
        while (true){
            flag[1] = true;
            turn = 0;
            while (flag[0] && turn == 0){
                /** do nothing 空自旋**/
            }
            /** critical section 临界区 **/
            flag [1] = false;
            /** do otherthings **/
        }
    }
    public static void main(){
        /** 并发执行p0 p1 读者有兴趣可自己验证一下**/
    }
}

 

 ■ 信号量

  • 基本原理: n个线程可以通过简单的信号进行合作,让一个线程可以*在某个位置停止,直到它接收到一个特殊的信号。任何复杂的合作需求都可以通过适当的信号结构得到满足
  • 组成部分
    •   为了发信号,需要使用一个称作信号量的特殊变量sem,通常被初始化为非负数
    •   为通过信号量sem传送信号,线程可执行原语semsignal(sem):此时信号量sem+1,当sem小于或等于0,则被semwait阻塞的线程被阻塞
    •   为了通过信号量sem接收信号,线程可执行原语semwait(sem):此时信号量sem-1,当sem变成负数,则执行semwait的线程被阻塞,否则线程继续执行
  • 分类:无论是计数信号量还是二元信号量,都需要使用队列保存在信号量上等待的进程/线程,这就需要决定进程按照什么顺序从队列中移除
    •   强信号量:使用fifo先见先出公平策略(即被阻塞时间最长的进程/线程最先被队列释放)的信号量(常用)
    •   弱信号量:没有规定进程/线程从队列中移除顺序的信号量
  • 补充: 二元信号量的区别只是sem的值只能是0和1而已

        * 信号量的实现 (cas版)

/**
 * 设计原则:任何时候只能一个线程可以用wait和signal操作控制一个信号量
 * 要求:semwait和semsingal操作必须作为原子原语实现
 * semaphore信号量(以下都简称sem)的属性 
 *      flag : 表示信号量是否可用,默认是0
 *      count: 
 *          当>=0时,表示可执行semwait而不被挂起的线程数
 *          当<0时,表示挂起在信号量的等待队列的线程数
 *      queue: 表示信号量相关联的等待队列,被阻塞的线程需放入该队列中
 *  ps:这里我们选用boolean版本的cas
 */
semwait(sem){
    //当发现sem.flag不为0时,就自旋等待直到为0
    //补充一点:忙等待可以保证队列操作的同步,
    //但由于wait和signal执行时间短,其开销还是很小的
    while(!compare_and_swap(sem.flag,0,1));
    sem.count--;
    if(sem.count < 0){
        /** 该线程进入sem.queue等待队列中并被阻塞 **/
    }
    sem.flag = 0;
}
semsignal(sem){
    //当发现sem.flag不为0时,就自旋等待直到为0
    while(!compare_and_swap(sem.flag,0,1));
    sem.count++;
    if(sem.count <= 0){
        /** 从sem.queue等待队列中移出,被移出的线程进入就绪队列**/
    }
    sem.flag = 0;
}

       * 信号量实现互斥

final int n = /** 线程数 **/
int s = 1;//semaphore
public void p(int i){
    while(true){
        semwait(s);
        /** critical zone 临界区 **/
        semsignal(s);
        /** do other 其他部分 **/
    }
}

 

■  消息传递 (大家比较熟悉的并发机制,当然有时间的话还会专门介绍一个mq,比如 rabbit mq )

 1. 消息传递的概述

  • 消息定义:消息传递指的是线程间通过发送消息的方式实现相互通信
  • 消息实现:通常会提供一对原语实现 send(destination,message) 、receive(source,message)
  • 发送消息:一个线程以消息massage的形式给另一个指定的目标destination线程发送消息
  • 接收消息:线程通过执行recieve原语接收来自源线程source的消息massage

 2. 消息结构

  • 消息类型: 指定的消息类型,接收者往往会根据该类型进行消息监听和捕获
  • 目标id/源id: 发送方/源的标识符
  • 消息长度: 整个消息的总长度,注意要控制长度
  • 控制信息: 额外信息,比如创建消息链表的指针、记录源和目标之间传递消息数目、顺序和序号以及优先级
  • 消息内容: 消息正文,相当于body
  • 补充: 读者可以参间iso中各种协议的包格式,比如http和tcp的包

 Java 并发之Concurrent 包综述

 3. 消息通信情况

  • send:要么发送线程被阻塞直到该消息被目标线程接收,要么不阻塞
  • receive :
    • 若消息在接收之前已经被发送,该消息被目标线程接受并继续执行
    • 若没有正在等待的消息,则该目标线程被阻塞直到所等待的消息到达,或者该线程继续执行,放弃接收

 Java 并发之Concurrent 包综述

 

■ concurrent 包结构

Java 并发之Concurrent 包综述

■ concurrent 包整体类图

Java 并发之Concurrent 包综述

■ concurrent包实现机制

  • 综述: 在整个并发包设计上,doug lea大师采用了3.1 concurrent包整体架构的三层结构
  • 补充: 并发包所涉及的内容笔者会陆续推出对应番进行阐述,敬请期待(进度视笔者的忙碌程度而定)

1. 底层-硬件指令支持

  • 综述: 并发包最底层是依赖于硬件级别的volatile和cas的支持
  • volatile:借用 volatile 的内存读写语义和阻止重排序保证数据可见性
  • cas: 借用cas的高效机器级别原子指令保证内存执行的 读-改-写 操作的原子性
  • 组合: 借用 volatile 变量的读/写和cas实现线程之间的有效通信,保证了原子性、可见性、有序性

2. 中间层-基础数据结构+算法支持

  • 综述: 在数据结构和算法的设计使用上,doug lea大师专门设计了aqs框架作为所有并发类库的并发基础,同时引入非阻塞算法和原子变量类增强了并发特性
  • aqs框架: aqs中提供了最基本、有效的并发api, doug lea大师期望其作为所有并发操作的基础解决方案,并发包中的绝大部分实现都是依赖于aqs(abstractqueuedsynchronizer),同时 aqs的基础是 cas 和 volatile的底层支持
  • 非阻塞数据结构: 非阻塞数据结构是非阻塞队列的设计基础,同时也是阻塞队列的参考对比的重要依据
  • 原子变量类: doug lea大师专门为所有的原子变量设计了专门的类库,甚至在后期还对齐做了增强,比如 longadder、longaccumulator 等,从侧面可以反映出数值操作对于编程的重要性

3. 高层-并发类库支持

  • 综述: doug lea大师在并发包中已经提供了丰富的并发类库极大方便了快速、安全的使用并发操作
  • lock: lock接口定义了一系列并发操作标准,详情参见 aqs框架之lock
  • 同步器: 每个并发类的同步器的实现依赖于aqs(继承),比如 reentrantlock 中的sync;同时笔者也将 并发类 同属于同步器的范围内
  • 阻塞队列: 顾名思义,支持阻塞的队列,主要是以queue结尾的类
  • 执行器: 所谓执行器,指的是任务的执行者,比如线程池和fork-join
  • 并发容器: 即支持并发的容器,主要包含cow和以concurrent开头的类,通常并发容器是非阻塞的

 

ps: 感谢  黄志鹏kira 的友情赞助,继续加油写出更棒的文章,后续文章会不断改进迭代~