欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

J.U.C 包

程序员文章站 2022-07-13 15:59:55
...

 

原文在:http://coderbee.net/index.php/java/20130927/491

 

概述

J.U.C 包是java.util.concurrent包的简写。这个包在JDK5引入,大大增强了Java的并发特性。JDK7还引入ForkJoin框架。

 

该包提供的能力主要包括:可重入锁,具有原子性操作属性的类,线程池执行服务,调度执行服务,增强的线程安全容器,线程关卡,信号器,ForkJoin任务执行框架等等。

 

可重入锁

内在锁

在JDK5之前,要获得同步方法,只能通过 synchronized 关键字来达到。

 

synchronized 使用内在锁或监视器。每个Java对象有一个与之关联的内在锁。每当一个线程尝试访问同步块或方法时,它获取此对象上的内在锁或监视器。对于静态方法的情况,线程获取类对象上的锁。内在锁机制从写代码的角度是个简洁的方法,适用于大多数情况。

 

内在锁的局限:

  1. 一个线程在等待获取一个锁时没法中断。(lock interruptibly)
  2. 尝试获取锁时没法不永久等待。(try lock)
  3. 没法实现结构化的非阻塞的锁机制,因为内在锁必须在获取它的同一个块里释放。(ReentrantLock可以在一个方法里获取,在另一个方法里释放)

可重入锁

当线程请求一个由其它线程持有的对象锁时,该线程会阻塞;而当线程请求由自己持有的对象锁时,如果该锁是重入锁,请求就会成功,否则阻塞。

 

ReentrantLock

java.util.concurrent.locks.ReentrantLock 一个可重入的互斥锁。此锁最多支持同一个线程发起的 2147483648 个递归锁。试图超过此限制会导致由锁方法抛出的 Error。

 

来自JDK文档的使用示例:

class X {
     private final ReentrantLock lock = new ReentrantLock();
     // ...

     public void m() {
          lock.lock();  // block until condition holds
          try {
               // ... method body
          } finally {
               lock.unlock()
          }
     }
}

ReentrantReadWriteLock

java.util.concurrent.locks.ReentrantReadWriteLock 支持读锁、写锁分离的可重入锁,与 ReentrantLock 类似。

具有原子性操作特性的类

java.util.concurrent.atomic 包提供了大量具有原子性操作的类,提供了比volatile关键字更多的功能。

volatile关键字

volatile关键字用于保证其所修饰的字段具有内存可视性,也就是对这些字段更新后,线程后续的读操作总能看到这些更新;线程读取这些字段了,也能看到其他线程之前所作的更新。

但是volatile不支持一些基本的操作,比如并发情况下的计数问题。对于一个字段 volatile int count = 0,执行 count++; 是在并发情况下对count的值是没有安全保证的,有的线程可能读到的是0,有的可能是1

对于下面这样比较后更新的情形,volatile也没法保证并发安全的。

boolean isLoad = false;
//  ...
if (! isLoad) {
     isLoad  = true;
}

原子类就是用来解决这类需求的。

原子类

该包的类大多提供了具有CAS(Compare And Swap,比较后交换)特性的方法,用于原子性地更新字段的值。

AtomicInteger 的一些方法:

compareAndSet(int expect, int update)    如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
addAndGet(int delta)    以原子方式将给定值与当前值相加。
decrementAndGet()      以原子方式将当前值减 1。
getAndAdd(int delta)     以原子方式将给定值与当前值相加。
getAndDecrement()      以原子方式将当前值减 1。
getAndSet(int newValue)    以原子方式设置为给定值,并返回旧值。
set(int newValue)              设置为给定值。
get()                                获取当前值。

无锁算法

原子类除了解决上面的这些问题,还可以用来实现一些无锁算法。实现情形大致是这样的:

AtomicInteger  counter = new AtomicInteger(0);
//   ....

int  oldValue,  newValue;

do {
     oldValue = counter.get();

     //  synchronized  code  trunk 

     newValue = oldVaue+1;
}while( !  counter.compareAndSet(oldValue,  newValue) );

这类实现认为,如果一个变量它的值在同步代码块访问前后没有修改,就认为没有线程并发执行了这个代码块。

无锁算法基本上利用了这个思想:把同步块最小化到单个变量上。对于每个同步条件,都需要一个具有CAS方法的类;如果同步条件太多,还是用锁简单点。

ABA问题

对于上面这类无锁算法实现的一个问题是:oldValue = counter.get(); 语句执行完后, counter的值为 A,这个线程T1被挂起,另一个线程T2对 counter进行更新,把值改为B,然后又更新为A,当T1恢复执行的时候,其实上下文已经被修改,而它却没法知道,因为值还是A。这就是“ABA问题”(此处应该google)。

此包为解决ABA问题提供了一个类:AtomicStampedReference,维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。

线程池执行服务

JDK5以前的多线程

在JDK5以前,利用Java的并发能力一般有两种方式:继承自Thread类并覆写run方法; 或者实现Runnable 接口,将实现的实例传递给Thread构造函数,再调用Thread.start方法来启动一个线程。代码大概如下:

static class MyThread extends Thread {
       @Override
       public void run() {
             // do something in other thread .
      }
}

static class Task implements Runnable {
       @Override
       public void run() {
             // task command .
      }
}

public static void main(String[] args) {
      MyThread myThread = new MyThread();
      myThread.start();

      Task task = new Task();
      Thread thread = new Thread(task);
      thread.start();
}

这里要说明的是:Runnable 实现是一个任务,是指令集合,不能自己运行;Thread 才是一个可执行的单元,是一个“CPU”。是Thread驱动执行任务Runnable

对于Thread对象,当它的run方法执行完成后,线程就进入完成状态,等待销毁。

当需要异步执行一个任务时,就新建一个线程,当一个虚拟机新建了几百上千个线程时,这会带来很多问题:

  1. 上下文切换代价。Java的线程是要映射到内核线程上去执行的,线程太多时基本上就没法进行有效的调度执行,线程之间的上下文切换就会占用很多的CPU周期。
  2. 内存资源占用。JVM新建一个线程后,要给线程分配相应的方法栈、程序计数器等资源,这些都会占用的一定的内存资源,线程很多时,这些资源就不小了。

这些问题说明,大量创建线程不是个好主意,我们需要重用已创建的线程。这就是线程池的作用。

线程重用的最基本原理

既然Thread的run方法执行完成就表示一个线程已终止,那么就需要在这个run方法上做点处理,让它能持续地处理我们的任务。

import java.util.concurrent.LinkedBlockingQueue;

public class SimpleReuseThread extends Thread {
       private volatile boolean isStop = false;
       private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(
                  1024);

       public boolean submitTask(Runnable task) {
             return taskQueue .add(task);
      }

       public void run() {
             while (!isStop ) {
                   if (!taskQueue .isEmpty()) {
                        Runnable task = taskQueue.peek();
                        task.run();
                  } else {
                         try {
                              Thread. sleep(1000);
                        } catch (InterruptedException ignore) {
                        }
                  }
            }
      }

       public void shutDown() {
             isStop = true ;
      }
}

上面的代码足以说明线程复用的最基本原理。但好的线程池实现还需要处理:线程池生命周期管理、线程复用、工作任务队列管理、高效地分派任务到执行线程等工作。好消息是已经有满足这些需求的线程池实现了。

J.U.C的线程池

J.U.C的线程池封装为 java.util.concurrent.ExecutorService 接口。该接口提供了方法用于 提交任务、执行任务、查询线程池是否已被关闭、关闭线程池。

java.util.concurrent.Executors 提供了便捷的方法用于创建各种线程池:

newCachedThreadPool()                       创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
newFixedThreadPool(int nThreads)            创建一个可重用固定线程数的线程池,以共享的*队列方式来运行这些线程。
newScheduledThreadPool(int corePoolSize)    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newScheduledThreadPool(int corePoolSize)    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newSingleThreadExecutor()                   创建一个使用单个 worker 线程的 Executor,以*队列方式来运行该线程。
newSingleThreadScheduledExecutor()           创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

Future

当把任务提交到线程池(java.util.concurrent.ExecutorService)去执行后,线程池会返回一个表示任务直接结果的接口Future,当前线程可以继续往下执行其他逻辑,在某个时间点,它可以查询任务是否执行完成,如果执行完成,还可以获取任务返回的结果。

来自doc文档的使用示例:

interface ArchiveSearcher { String search(String target); }
class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)  throws InterruptedException {

     Future<String> future = executor.submit(new Callable<String>() {
            public String call() {
                 return searcher.search(target);
            }});
     displayOtherThings(); // do other things while searching
     try {
          displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
}

调度执行服务

Timer

在JDK5以前,如果需要定时或周期性地执行某个任务,可以通过 java.util.Timer 类来实现。

Timer 的缺陷:

  1. 所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;(延迟累加)
  2. Timer线程不捕获异常,如果TimerTask抛出异常,将导致Timer线程终止。如果Timer 的线程死掉了,所有任务都不会再执行。
  3. Timer对调度的支持是基于绝对时间的,而不是相对时间,因此任务对系统时钟的改变是敏感的;下面的调度服务只支持相对时间。

如果还在用 Timer 进行调度,真该好好考虑调度执行服务了。

调度服务

这里说的调度服务其实是线程池提供的扩展能力,调度服务封装为java.util.concurrent.ScheduledExecutorService 接口。它解决了上面的Timer缺陷。

schedule(Callable<V> callable, long delay, TimeUnit unit)           创建并执行在给定延迟后启用的 ScheduledFuture。
schedule(Runnable command, long delay, TimeUnit unit)               创建并执行在给定延迟后启用的一次性操作。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)             创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)           创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

先写这么多,以后再补充。

增强的线程安全容器

线程关卡

信号器

ForkJoin任务执行框架

相关标签: j.u.c包