欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java高并发编程案例

程序员文章站 2022-05-04 08:41:10
...

synchronized关键字

对象加锁

对某个对象加锁,互斥锁(一个线程拿到其它就拿不到)

public class T {
    private int count = 10;
    private Object o = new Object();

    public void m() {
        synchronized(o) { //任何线程要执行下面的代码,必须先拿到o的锁
            count--;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
        }
    }
}

若每次都要建一个Object来当锁太麻烦,直接拿自身对象当锁即可

public class T {

    private int count = 10;

    public void m() {
        synchronized(this) { //任何线程要执行下面的代码,必须先拿到this的锁
            count--;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
        }
    }
}

修饰方法

直接用synchronized修饰方法等同于锁定自身对象

public class T {

    private int count = 10;

    public synchronized void m() { //等同于在方法的代码执行时要synchronized(this)
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }
}

锁定静态方法

public class T {

    private static int count = 10;

    public synchronized static void m() { //这里等同于synchronized(top.tjtulong.T.class)
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }

    public static void mm() {
        synchronized(T.class) { //考虑一下这里写synchronized(this)是否可以?
            count --;
        }
    }
}

一个synchronized代码块的代码是一个原子操作

同步和非同步方法同时调用

public class T {

    public synchronized void m1() {
        System.out.println(Thread.currentThread().getName() + " m1 start...");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m1 end");
    }

    public void m2() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m2 ");
    }

    public static void main(String[] args) {
        T t = new T();

		/*new Thread(()->t.m1(), "t1").start();
		new Thread(()->t.m2(), "t2").start();*/

        new Thread(t::m1, "t1").start();
        new Thread(t::m2, "t2").start();

		/*
		new Thread(new Runnable() {

			@Override
			public void run() {
				t.m1();
			}

		});
		*/
    }
}

输出结果:

t1 m1 start...
t2 m2 
t1 m1 end

可见同步和非同步方法可以同时调用

脏读

只对写加锁而不对读加锁会造成脏读!

public class Account {
    String name;
    double balance;

    public synchronized void set(String name, double balance) {
        this.name = name;

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        this.balance = balance;
    }

    public /*synchronized*/ double getBalance(String name) {
        return this.balance;
    }

    public void setBalance(double balance) {
        this.balance = balance;
    }


    public static void main(String[] args) {
        Account a = new Account();
        new Thread(() -> a.set("zhangsan", 100.0)).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        a.setBalance(50.0);
        System.out.println(a.getBalance("zhangsan"));

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(a.getBalance("zhangsan"));
    }
}

可重入锁

public class T {
    synchronized void m1() {
        System.out.println("m1 start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        m2();
    }

    synchronized void m2() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("m2");
    }
}

m2()方法可以执行

一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁,也就是说synchronized获得的锁是可重入的

异常释放锁

public class T {
    int count = 0;

    synchronized void m() {
        System.out.println(Thread.currentThread().getName() + " start");
        while (true) {
            count++;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            try {
                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (count == 5) {
                int i = 1 / 0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
                System.out.println(i);
            }
        }
    }

    public static void main(String[] args) {
        T t = new T();
        Runnable r = new Runnable() {

            @Override
            public void run() {
                t.m();
            }
        };
        new Thread(r, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(r, "t2").start();
    }
}

输出结果:

t1 start
t1 count = 1
t1 count = 2
t1 count = 3
t1 count = 4
t1 count = 5
t2 start
Exception in thread "t1" java.lang.ArithmeticException: / by zero
t2 count = 6
	at top.tjtulong.demo8.T.m(T.java:21)
	at top.tjtulong.demo8.T$1.run(T.java:33)
	at java.lang.Thread.run(Thread.java:748)
t2 count = 7
t2 count = 8
t2 count = 9
t2 count = 10
t2 count = 11

程序在执行过程中,如果出现异常,默认情况锁会被释放,所以在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据,因此要非常小心的处理同步业务逻辑中的异常。

同步监视器变化

锁定某对象o,如果o的属性发生改变,不影响锁的使用;但是如果o变成另外一个对象,则锁定的对象发生改变,应该避免将锁定对象的引用变成另外的对象。

public class T {

    Object o = new Object();

    void m() {
        synchronized(o) {
            while(true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }
    }

    public static void main(String[] args) {
        T t = new T();
        //启动第一个线程
        new Thread(t::m, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //创建第二个线程
        Thread t2 = new Thread(t::m, "t2");

        t.o = new Object(); //锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会

        t2.start();
    }
}


注:不要用字符串常量作为锁定对象,因为这样其实锁定的是同一个对象。这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,因为你的程序和你用到的类库不经意间使用了同一把锁。

volatile

线程之间的可见性

public class T {
    //对比一下有无volatile的情况下,整个程序运行结果的区别
    /*volatile*/ boolean running = true; 

    void m() {
        System.out.println("m start");
        while (running) {

        }
        System.out.println("m end!");
    }

    public static void main(String[] args) {
        T t = new T();

        new Thread(t::m, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t.running = false;
    }
}

如果running变量不用volatile修饰的话,程序将不会停止。

volatile 关键字,使一个变量在多个线程间可见。A、B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道。但使用volatile关键字,会让所有线程都会读到变量的修改值。

在上面的代码中,running是存在于堆内存的t对象中,当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都去读取堆内存,这样当主线程修改running的值之后,t1线程感知不到,所以不会停止运行;而使用volatile,将会强制所有线程都去堆内存中读取running的值。

volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized

对比synchronized

public class T {
    volatile int count = 0;
    void m() {
        for(int i=0; i<10000; i++) count++;
    }

    public static void main(String[] args) {
        T t = new T();

        List<Thread> threads = new ArrayList<Thread>();

        for(int i=0; i<10; i++) {
            threads.add(new Thread(t::m, "thread-"+i));
        }

        threads.forEach((o)->o.start());

        threads.forEach((o)->{
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println(t.count);
    }
}

输出结果必定小于100000.

这是由于volatile并不能保证多个线程共同修改running变量时所带来的不一致问题(++不是原子性操作),也就是说volatile不能替代synchronized。

Java高并发编程案例

synchronized void m() { 
    for (int i = 0; i < 10000; i++)
        count++;
}

对比上一个程序,可以用synchronized解决,synchronized可以保证可见性和原子性,volatile只能保证可见性。

AtomXXX

解决同样的问题的更高效的方法,使用AtomXXX类:

AtomicInteger count = new AtomicInteger(0); 

void m() { 
    for (int i = 0; i < 10000; i++)
        //if count.get() < 1000
        count.incrementAndGet(); //等同于count++,但是是原子的
}

AtomXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的。

淘宝面试题

实现一个容器,提供两个方法,add,size,写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束。

方法一:使用wait和notify

public class MyContainer4 {

    //添加volatile,使t2能够得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }

    public static void main(String[] args) {
        MyContainer4 c = new MyContainer4();

        final Object lock = new Object();

        new Thread(() -> {
            synchronized(lock) {
                System.out.println("t2启动");
                if(c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
                //通知t1继续执行
                lock.notify();
            }

        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1启动");
            synchronized(lock) {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);

                    if(c.size() == 5) {
                        lock.notify();
                        //释放锁,让t2得以执行
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
    }
}

sleep() 不释放锁
wait() 释放锁
notify() 不释放锁

方法二:使用Latch(门闩) 替代wait和notify来进行通知。好处是通信方式简单,同时也可以指定等待时间。使用await和countdown方法替代wait和notify,CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行。

当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了,这时应该考虑countdownlatch/cyclicbarrier/semaphore。

public class MyContainer5 {

    // 添加volatile,使t2能够得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }

    public static void main(String[] args) {
        MyContainer5 c = new MyContainer5();

        CountDownLatch latch = new CountDownLatch(1);

        new Thread(() -> {
            System.out.println("t2启动");
            if (c.size() != 5) {
                try {
                    latch.await();

                    TimeUnit.SECONDS.sleep(5);
                    //也可以指定等待时间
                    //latch.await(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("t2 结束");

        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1启动");
            for (int i = 0; i < 10; i++) {
                c.add(new Object());
                System.out.println("add " + i);

                if (c.size() == 5) {
                    // 打开门闩,让t2得以执行
                    latch.countDown();
                }

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }, "t1").start();
    }
}

Reentrantlock

使用Reentrantlock可以替代synchronize完成同样的功能。需要注意的是,必须要必须要必须要手动释放锁。使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放。

class X{
    //定义锁对象
    private final ReentrantLock lock=new ReentrantLock();
    //定义需要保证线程安全的方法
    public void m(){
        //加锁
        lock.lock();
        try{
            //...method body
        }
        //使用finally块来保证释放锁
        finally{
            lock.unlock();
        }
    }
}

使用Reentrantlock可以进行尝试锁定tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待。

public class ReentrantLock3 {
    Lock lock = new ReentrantLock();

    void m1() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
     * 可以根据tryLock的返回值来判定是否锁定
     * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
     */
    void m2() {
		/*
		boolean locked = lock.tryLock();
		System.out.println("m2 ..." + locked);
		if(locked) lock.unlock();
		*/

        boolean locked = false;

        try {
            locked = lock.tryLock(5, TimeUnit.SECONDS);
            System.out.println("m2 ..." + locked);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(locked) lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantLock3 rl = new ReentrantLock3();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }
}

使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,在一个线程等待锁的过程中,可以被打断。

ReentrantLock还可以指定为公平锁,即谁等的时间长,谁得到锁,synchronized为非公平锁,不在乎一个线程已经等待多长时间。

public class ReentrantLock5 extends Thread {

    private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果
    public void run() {
        for(int i=0; i<100; i++) {
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"获得锁");
            }finally{
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        ReentrantLock5 rl=new ReentrantLock5();
        Thread th1=new Thread(rl);
        Thread th2=new Thread(rl);
        th1.start();
        th2.start();
    }
}

输出结果为:

Thread-1获得锁
Thread-2获得锁
Thread-1获得锁
Thread-2获得锁
Thread-1获得锁
Thread-2获得锁
Thread-1获得锁
Thread-2获得锁

生产者–消费者

写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。

使用wait+notify

public class MyContainer1<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10个元素
    private int count = 0;


    public synchronized void put(T t) {
        while (lists.size() == MAX) { //想想为什么用while而不是用if?
            try {
                this.wait(); //effective java
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        lists.add(t);
        ++count;
        this.notifyAll(); //通知消费者线程进行消费
    }

    public synchronized T get() {
        T t = null;
        while (lists.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        t = lists.removeFirst();
        count--;
        this.notifyAll(); //通知生产者进行生产
        return t;
    }

    public static void main(String[] args) {
        MyContainer1<String> c = new MyContainer1<>();
        //启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //启动生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }
}

关键点:

  1. 用while而不是用if;
  2. notifyAll而不是notify。

使用Lock+Condition

Condition的方式可以更加精确的指定哪些线程被唤醒

public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10个元素
    private int count = 0;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public void put(T t) {
        try {
            lock.lock();
            while(lists.size() == MAX) { //想想为什么用while而不是用if?
                producer.await();
            }

            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消费者线程进行消费
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public T get() {
        T t = null;
        try {
            lock.lock();
            while(lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count --;
            producer.signalAll(); //通知生产者进行生产
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //启动消费者线程
        for(int i=0; i<10; i++) {
            new Thread(()->{
                for(int j=0; j<5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //启动生产者线程
        for(int i=0; i<2; i++) {
            new Thread(()->{
                for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }
}

ThreadLocal

ThreadLocal是线程局部变量

ThreadLocal是使用空间换时间,synchronized是使用时间换空间

public class ThreadLocal2 {
    //volatile static Person p = new Person();
    static ThreadLocal<Person> tl = new ThreadLocal<>();

    public static void main(String[] args) {

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(tl.get());
        }).start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            tl.set(new Person());
        }).start();
    }

    static class Person {
        String name = "zhangsan";
    }
}

同步容器

ConcurrentMap

多线程情况下用ConcurrentHashMap代替HashMap;
ConcurrentHashMap效率要比Hashtable高;
ConcurrentSkipListMap支持高并发且排序;
Collections.sychronizedXXX可以将非线程安全的容器变为线程安全的容器。

public class T01_ConcurrentMap {
    public static void main(String[] args) {
        //Map<String, String> map = new ConcurrentHashMap<>();
        //Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序

        //Map<String, String> map = new Hashtable<>();
        Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX
        //TreeMap
        Random r = new Random();
        Thread[] ths = new Thread[100];
        CountDownLatch latch = new CountDownLatch(ths.length);
        long start = System.currentTimeMillis();
        for(int i=0; i<ths.length; i++) {
            ths[i] = new Thread(()->{
                for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));
                latch.countDown();
            });
        }

        Arrays.asList(ths).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }
}

CopyOnWriteArrayList

写的速度很慢但读的速度很快。

ConcurrentLinkedQueue

内部加锁的队列

public class T04_ConcurrentQueue {
    public static void main(String[] args) {
        Queue<String> strs = new ConcurrentLinkedQueue<>();

        for (int i = 0; i < 10; i++) {
            strs.offer("a" + i);  //add
        }

        System.out.println(strs);

        System.out.println(strs.size());

        System.out.println(strs.poll());//取出并删除
        System.out.println(strs.size());

        System.out.println(strs.peek());//仅取出不删除
        System.out.println(strs.size());

        //双端队列Deque
    }
}

BlockingQueue

阻塞式队列:LinkedBlockingQueue(*)和ArrayBlockingQueue(有界)

LinkedBlockingQueue

LinkedBlockingQueue自动实现了阻塞式队列

public class T05_LinkedBlockingQueue {

    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

    static Random r = new Random();

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    strs.put("a" + i); //如果满了,就会等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();

        }
    }
}

ArrayBlockingQueue

public class T06_ArrayBlockingQueue {

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

    static Random r = new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a" + i);
        }

        strs.put("aaa"); //满了就会等待,程序阻塞
        strs.add("aaa");//添加不进去报错
        strs.offer("aaa");//添加不进去返回false
        strs.offer("aaa", 1, TimeUnit.SECONDS);//1s添加不进去返回false

        System.out.println(strs);
    }
}

DelayQueue

数组中每个元素记录着自己还有多长时间可以被消费者拿走,可用作定时执行任务,放入DelayQueue中的元素必须实现Delayed接口。

public class T07_DelayQueue {

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    static Random r = new Random();

    static class MyTask implements Delayed {
        long runningTime;

        MyTask(long rt) {
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                return 1;
            else
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {

            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }


        @Override
        public String toString() {
            return "" + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask(now + 1000);
        MyTask t2 = new MyTask(now + 2000);
        MyTask t3 = new MyTask(now + 1500);
        MyTask t4 = new MyTask(now + 2500);
        MyTask t5 = new MyTask(now + 500);

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);

        for(int i=0; i<5; i++) {
            System.out.println(tasks.take());
        }
    }
}

线程池

Executor

最顶层接口

public class T01_MyExecutor implements Executor{

    public static void main(String[] args) {
        new T01_MyExecutor().execute(()->System.out.println("hello executor"));
    }

    @Override
    public void execute(Runnable command) {
        //new Thread(command).run();
        command.run();
    }
}

Callable

Callable接口提供了一个call()方法可以作为线程执行体,但call()方法比run()方法功能更强大:call()方法可以有返回值且call()方法可以声明抛出异常。

Executors

操作Exector的工具类。

ThreadPool

线程池,维护着一个任务队列和一个完成队列。

public class T05_ThreadPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
                //aaa@qq.com[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
            });
        }
        System.out.println(service);

        service.shutdown();
        System.out.println(service.isTerminated());//是否执行完false
        System.out.println(service.isShutdown());//是否关闭true
        System.out.println(service);
        //aaa@qq.com[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]

        TimeUnit.SECONDS.sleep(5);
        System.out.println(service.isTerminated());//true
        System.out.println(service.isShutdown());//true
        System.out.println(service);
        //aaa@qq.com[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    }
}

Future

相当于Callable方法未来要返回的值

public class T06_Future {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1000;
        }); //new Callable () { Integer call();}

        new Thread(task).start();

        System.out.println(task.get()); //阻塞返回1000

        //*******************************
        ExecutorService service = Executors.newFixedThreadPool(5);
        Future<Integer> f = service.submit(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1;
        });
        System.out.println(f.get());//1
        System.out.println(f.isDone());//true
    }
}

并行计算

计算1-200000之间有多少个质数。

一般起线程数 > CPU核数

public class T07_ParallelComputing {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        getPrime(1, 200000);
        long end = System.currentTimeMillis();
        System.out.println(end - start);

        final int cpuCoreNum = 4;

        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);

        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);

        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
    }

    static class MyTask implements Callable<List<Integer>> {
        int startPos, endPos;

        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }

        @Override
        public List<Integer> call() throws Exception {
            List<Integer> r = getPrime(startPos, endPos);
            return r;
        }

    }

    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }

    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for(int i=start; i<=end; i++) {
            if(isPrime(i)) results.add(i);
        }

        return results;
    }
}

从计算结果可以看出,利用线程池并行计算明显比单线程计算快。

CachedPool

开始没有线程,需要起线程时便启动线程,当线程空闲60s(可以自己设置)时自动关闭。

public class T08_CachedPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        //aaa@qq.com[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        //aaa@qq.com[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]

        TimeUnit.SECONDS.sleep(80);

        System.out.println(service);
        //aaa@qq.com[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    }
}

SingleThreadPool

线程池中只有一个线程,保证任务顺序执行。

public class T09_SingleThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for(int i=0; i<5; i++) {
            final int j = i;
            service.execute(()->{

                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }
}

ScheduledPool

以固定的频率执行任务

public class T10_ScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
        // 500 表示每隔0.5秒执行一次
    }
}

WorkStealingPool

线程池中每隔线程都维护一个任务列表,当自身的任务列表执行完成后,会执行别的线程的任务列表。本质是ForkJoinPool线程池,所有的线程都是精灵线程。

public class T11_WorkStealingPool {
    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool();
        System.out.println(Runtime.getRuntime().availableProcessors());

        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000)); //daemon
        service.execute(new R(2000));

        //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
        System.in.read();
    }

    static class R implements Runnable {
        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time  + " " + Thread.currentThread().getName());
        }
    }
}

ForkJoinPool

Fork/Join框架就是在必要的情况下,将一个大任务进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join汇总
Java高并发编程案例

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        System.out.println(Arrays.stream(nums).sum()); //stream api
    }

	/*
	static class AddTask extends RecursiveAction {

		int start, end;

		AddTask(int s, int e) {
			start = s;
			end = e;
		}

		@Override
		protected void compute() {

			if(end-start <= MAX_NUM) {
				long sum = 0L;
				for(int i=start; i<end; i++) sum += nums[i];
				System.out.println("from:" + start + " to:" + end + " = " + sum);
			} else {

				int middle = start + (end-start)/2;

				AddTask subTask1 = new AddTask(start, middle);
				AddTask subTask2 = new AddTask(middle, end);
				subTask1.fork();
				subTask2.fork();
			}
		}
	}
	*/

    static class AddTask extends RecursiveTask<Long> {

        private static final long serialVersionUID = 1L;
        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }

    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println(result);

        //System.in.read();
    }
}

ps:多线程归并排序

各种线程池归根到底都是ThreadPoolExecutor类,指定起始线程、最大线程、存活时间及使用的队列。

ParallelStream

利用多线程访问数据流

public class T14_ParallelStreamAPI {
    public static void main(String[] args) {
        List<Integer> nums = new ArrayList<>();
        Random r = new Random();
        for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));

        //System.out.println(nums);

        long start = System.currentTimeMillis();
        nums.forEach(v->isPrime(v));
        long end = System.currentTimeMillis();
        System.out.println(end - start);

        //使用parallel stream api

        start = System.currentTimeMillis();
        nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime);
        end = System.currentTimeMillis();

        System.out.println(end - start);
    }

    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
}
相关标签: 高并发