欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java并发

程序员文章站 2022-04-25 18:22:56
编程问题中相当大的一部分都可以通过使用顺序编程来解决。 对于某些问题,如果能够并行地执行程序中的多个部分,则会变得非常方便。 并行编程可以使程序执行速度得到极大地提高。 当并行执行的任务彼此开始产生互相干涉时,实际的并发问题就会接踵而至。 Web服务器经常包含多个处理器,而并发是充分利用这些处理器的 ......

编程问题中相当大的一部分都可以通过使用顺序编程来解决。
对于某些问题,如果能够并行地执行程序中的多个部分,则会变得非常方便。
并行编程可以使程序执行速度得到极大地提高。
当并行执行的任务彼此开始产生互相干涉时,实际的并发问题就会接踵而至。
Web服务器经常包含多个处理器,而并发是充分利用这些处理器的理想方式。

1.基本的线程机制

并发编程使我们可以将程序划分为多个分离的、独立运行的任务
通过使用多线程机制,这些独立任务中的每一个都将由执行线程来驱动。
一个线程就是在进程中的一个单一的顺序控制流。
单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有自己的CPU一样。其底层机制是切分CPU时间,但通常你不需要考虑它。
在使用线程时,CPU将轮流给每个任务分配其占用时间,每个任务都觉得自己在一直占用CPU,但事实上CPU时间是划分成片段分配给了所有的任务。
多任务和多线程往往是使用多处理器系统的最合理方式。

1.1 定义任务

线程可以驱动任务,你需要一种描述任务的方式,这可以由Runnable接口来提供。

public class LiftOff implements Runnable {
  protected int countDown = 10; // Default
  private static int taskCount = 0;
  private final int id = taskCount++;
  public LiftOff() {}
  public LiftOff(int countDown) {
    this.countDown = countDown;
  }
  public String status() {
    return "#" + id + "(" +
      (countDown > 0 ? countDown : "Liftoff!") + "), ";
  }
  public void run() {
    while(countDown-- > 0) {
      System.out.print(status());
      Thread.yield();
    }
  }
}

任务的run()方法通常总会有某种形式的循环,使得任务一直运行下去直到不再需要。
通常,run()被写成无限循环的形式,这就意味着,除非有某个条件使得run()终止,否则它将永远运行下去。

当从Runnable导出一个类时,它必须具有run()方法,但是这个方法并无特殊之处——它不会产生任何内在的线程能力。要实现线程的行为,你必须显式地将一个任务附着到线程上。

1.2 Thread类

将Runnable对象转变为工作任务的传统方式是把它提交给一个Thread构造器。

public class BasicThreads {
  public static void main(String[] args) {
    Thread t = new Thread(new LiftOff());
    t.start();
    System.out.println("Waiting for LiftOff");
  }
} 

调用Thread对象的start()方法为该线程执行必需的初始化操作,然后调用Runnable的run()方法,以便在这个新线程中启动任务。

你可以很容易地添加更多的线程去驱动更多的任务。

public class MoreBasicThreads {
  public static void main(String[] args) {
    for(int i = 0; i < 5; i++)
      new Thread(new LiftOff()).start();
    System.out.println("Waiting for LiftOff");
  }
}

这个程序一次运行的结果可能与另一次运行的结果不同,因为线程调度机制是非确定性的。

1.3 继承Thread类

在非常简单的情况下,你可以直接继承Thread类来代替实现Runnable接口的方式。

public class SimpleThread extends Thread {
  private int countDown = 5;
  private static int threadCount = 0;
  public SimpleThread() {
    // Store the thread name:
    super(Integer.toString(++threadCount));
    start();
  }
  public String toString() {
    return "#" + getName() + "(" + countDown + "), ";
  }
  public void run() {
    while(true) {
      System.out.print(this);
      if(--countDown == 0)
        return;
    }
  }
  public static void main(String[] args) {
    for(int i = 0; i < 5; i++)
      new SimpleThread();
  }
}

实现接口使得你可以继承另一个不同的类,而从Tread继承则不行。

1.4 优先级

Java线程有优先级,优先级高的线程会获得较多的运行机会。
Java线程的优先级用整数表示,取值范围是1~10,Thread类有以下三个静态常量:

public class Thread implements Runnable {
    //...
    
    /**
     * The minimum priority that a thread can have.
     */
    public final static int MIN_PRIORITY = 1;//线程的最低优先级

   /**
     * The default priority that is assigned to a thread.
     */
    public final static int NORM_PRIORITY = 5;//线程的默认优先级

    /**
     * The maximum priority that a thread can have.
     */
    public final static int MAX_PRIORITY = 10;//线程的最高优先级
    
    //...
    
    public final void setPriority(int newPriority) {
        ThreadGroup g;
        checkAccess();
        if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
            throw new IllegalArgumentException();
        }
        if((g = getThreadGroup()) != null) {
            if (newPriority > g.getMaxPriority()) {
                newPriority = g.getMaxPriority();
            }
            setPriority0(priority = newPriority);
        }
    }

    public final int getPriority() {
        return priority;
    }

    //...
}

Thread类的setPriority()getPriority()方法分别用来设置和获取线程的优先级。

1.5 线程的状态

一个线程可以处于以下四种状态之一:

  1. 新建(new):当线程被创建时,它只会短暂地处于这种状态。
  2. 就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。
  3. 阻塞(Blocked):当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入了就绪状态,它才有可能执行操作。
  4. 死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的(任务死亡的通常方式是从run()方法返回)。

一个任务进入阻塞状态,可能有如下原因:

  1. 通过调用sleep(milliseconds)使任务进入休眠状态,在这种情况下,任务在指定时间内不会运行。
  2. 你通过调用wait()使线程挂起。直到线程得到了notify()notifyAll()消息,线程才会进入就绪状态。
  3. 任务在等待某个输入/输出完成。
  4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

2.解决共享资源竞争

使用线程时的一个基本问题:你永远都不知道一个线程何时在运行
对于并发工作,你需要某种方式来防止两个任务访问相同的资源
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。
第一个访问某项资源的任务必须锁定这项资源,使其他任务在其被解锁之前,就无法访问它了。而在其被解锁之时,另一个任务就可以锁定并使用它。
Java以提供关键字synchronized的形式,为防止资源冲突提供内置支持。当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。

共享资源一般是以对象形式存在的内存片段,但也可以是文件、输入/输出端口,或者是打印机。
要控制对共享资源的访问,得先把它包装进一个对象。然后把所有要访问这个资源的方法标记为synchronized。
当在对象上调用其任意synchronized方法的时候,此对象都被加锁。
这时该对象上的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。

一个任务可以多次获得对象的锁。
如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一对象上的另一个方法,就会发生这种情况。
JVM负责跟踪对象被加锁的次数。
如果一个对象被解锁,其计数变为0。在任务第一次给对象加锁的时候,计数变为1。每当这个相同的任务在这个对象上获得锁时,计数都会递增。
只有首先获得了锁的任务才能允许继续获取多个锁。
每当任务离开一个synchronized方法,计数递减,当计数为零的时候,锁被完全释放,此时别的任务就可以使用此资源。

3.JUC(java.util.concurrent)

3.1 volatile

如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则,这个域就应该只能经由同步来访问。
如果一个域完全由synchronized方法或语句块来防护,那就不必将其设置为volatile的。

/*
 * 一、volatile 关键字:当多个线程进行操作共享数据时,可以保证内存中的数据可见。
 *                    相较于 synchronized 是一种较为轻量级的同步策略。
 * 
 * 注意:
 * 1. volatile 不具备“互斥性”
 * 2. volatile 不能保证变量的“原子性”
 */
public class TestVolatile {
    
    public static void main(String[] args) {
        ThreadDemo td = new ThreadDemo();
        new Thread(td).start();
        
        while(true){
            if(td.isFlag()){
                System.out.println("------------------");
                break;
            }
        }
        
    }

}

class ThreadDemo implements Runnable {

    private volatile boolean flag = false;

    @Override
    public void run() {
        
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }

        flag = true;
        
        System.out.println("flag=" + isFlag());

    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

}

3.2 原子类

Java SE5引入了诸如AtomicIntegerAtomicLongAtomicReference等特殊的原子性变量类。
这些类被调整为可以使用在某些现代处理器上的可获得的原子性。对于常规编程来说,它们很少会派上用场,但是在涉及性能调优时,它们就有大有用武之地了。

public class TestAtomicDemo {

    public static void main(String[] args) {
        AtomicDemo ad = new AtomicDemo();
        
        for (int i = 0; i < 10; i++) {
            new Thread(ad).start();
        }
    }
    
}

class AtomicDemo implements Runnable{
    
//  private volatile int serialNumber = 0;
    
    private AtomicInteger serialNumber = new AtomicInteger(0);

    @Override
    public void run() {
        
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        
        System.out.println(getSerialNumber());
    }
    
    public int getSerialNumber(){
        return serialNumber.getAndIncrement();
    }
    
}

3.3 线程池

使用Executor

/*
 * 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。
 * 
 * 二、线程池的体系结构:
 *  java.util.concurrent.Executor : 负责线程的使用与调度的根接口
 *      |--**ExecutorService 子接口: 线程池的主要接口
 *          |--ThreadPoolExecutor 线程池的实现类
 *          |--ScheduledExecutorService 子接口:负责线程的调度
 *              |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
 * 
 * 三、工具类 : Executors 
 * ExecutorService newFixedThreadPool() : 创建固定大小的线程池
 * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
 * ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程
 * 
 * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
 */
public class TestThreadPool {
    
    public static void main(String[] args) throws Exception {
        //1. 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
                    
        ThreadPoolDemo tpd = new ThreadPoolDemo();
        
        //2. 为线程池中的线程分配任务
        for (int i = 0; i < 10; i++) {
            pool.submit(tpd);
        }
        
        //3. 关闭线程池
        pool.shutdown();
    }

}

class ThreadPoolDemo implements Runnable{

    private int i = 0;
    
    @Override
    public void run() {
        while(i <= 100){
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }
    
}

从任务中产生返回值

如果你希望任务在完成时能够返回一个值,那么可以实现Callable接口而不是Runnable接口。
在Java SE5中引入的Callable是一种具有类型参数的泛型,它的类型参数表示的是从方法call()中返回的值,并且必须使用ExecutorService.submit()方法调用它。

public class TestThreadPool {
    
    public static void main(String[] args) throws Exception {
        //1. 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
        
        List<Future<Integer>> list = new ArrayList<>();
        
        for (int i = 0; i < 10; i++) {
            Future<Integer> future = pool.submit(new Callable<Integer>(){

                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    
                    for (int i = 0; i <= 100; i++) {
                        sum += i;
                    }
                    
                    return sum;
                }
                
            });

            list.add(future);
        }
        
        pool.shutdown();
        
        for (Future<Integer> future : list) {
            System.out.println(future.get());
        }
        
    }

}

class ThreadPoolDemo implements Runnable{

    private int i = 0;
    
    @Override
    public void run() {
        while(i <= 100){
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }
    
}

3.4 同步锁

public class TestLock {
    
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        
        new Thread(ticket, "1号窗口").start();
        new Thread(ticket, "2号窗口").start();
        new Thread(ticket, "3号窗口").start();
    }

}

class Ticket implements Runnable{
    
    private int tick = 100;
    
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while(true){
            
            lock.lock(); //上锁
            
            try{
                if(tick > 0){
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    
                    System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + --tick);
                }
            }finally{
                lock.unlock(); //释放锁
            }
        }
    }
    
}

3.5 生产者与消费者

使用互斥并允许任务挂起的基本类是Condition,你可以通过在Condition上调用await()来挂起一个任务。
当外部条件发生变化,意味着某个任务应该继续执行时,你可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者调用signalAll()来唤醒所有在这个Condition上被其自身挂起的任务。

public class TestProductorAndConsumerForLock {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor pro = new Productor(clerk);
        Consumer con = new Consumer(clerk);

        new Thread(pro, "生产者 A").start();
        new Thread(con, "消费者 B").start();

//       new Thread(pro, "生产者 C").start();
//       new Thread(con, "消费者 D").start();
    }

}

class Clerk {
    private int product = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    // 进货
    public void get() {
        lock.lock();

        try {
            if (product >= 1) { // 为了避免虚假唤醒,应该总是使用在循环中。
                System.out.println("产品已满!");

                try {
                    condition.await();
                } catch (InterruptedException e) {
                }

            }
            System.out.println(Thread.currentThread().getName() + " : "
                    + ++product);

            condition.signalAll();
        } finally {
            lock.unlock();
        }

    }

    // 卖货
    public void sale() {
        lock.lock();

        try {
            if (product <= 0) {
                System.out.println("缺货!");

                try {
                    condition.await();
                } catch (InterruptedException e) {
                }
            }

            System.out.println(Thread.currentThread().getName() + " : "
                    + --product);

            condition.signalAll();

        } finally {
            lock.unlock();
        }
    }
}

// 生产者
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            clerk.get();
        }
    }
}

// 消费者
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            clerk.sale();
        }
    }

}