欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

线程(二)

程序员文章站 2022-05-02 13:10:00
...
1.线程同步

​ synchronized关键字,标上这个关键字的方法或者代码块(同步块)说明需要同步,当一个线程进入里面后,就会给这个方法或者代码块加锁,只有这一个线程可以访问,其他线程无法访问,只有当这个线程执行完后释放锁,其他线程再获得锁,才可以访问,这样的任务(类)是线程安全的。

可重入锁ReentrantLock,利用ReentrantLock加锁,也可以实现同步,和synchronized关键字一样,而且加锁还有其他的功能,如线程间通信

public class ExecutorTest {
    public static void main(String[] args) {
        //创建任务
        Runnable task = new MyThread();
        //定义一个执行器
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //执行任务
        executor.execute(task);
        executor.execute(task);
        executor.execute(task);
        //关闭执行器
        executor.shutdown();
    }
    static class MyThread implements Runnable {
	    private static int ticket = 100;
	    //新建一个锁
	    private static Lock lock = new ReentrantLock();
	
	    @Override
	    public void run() {
	        while (ticket > 0) {
	            lock.lock();//加锁
	            try {
	                if (ticket > 0) {
	                    System.out.print(Thread.currentThread().getName() + "   ");
	                    System.out.print("卖出第" + (100 - ticket + 1));
	                    System.out.println("张票,剩下票数:" + (ticket - 1));
	                    ticket--;
	                }
	                Thread.sleep(10);
	            } catch (InterruptedException e) {
	            } finally {
	                //在finally释放锁,确保锁能够被释放
	                lock.unlock();//释放锁
	            }
	        }
	    }
	}
}
2.线程间通信

利用可重入锁ReentrantLock创建一个Condition对象来实现线程间通信,Condition具有的方法有
上面说的创建一个Condition对象可以实现线程间通信,这个接口的一些方法如下:

//java.util.concurrent.Condition 接口
void await() 让当前线等待,当别的线程执行signal之后才会继续执行
void signal() 唤醒一个等待的线程继续执行
Condition signalAll() 唤醒所有等待的线程

典型实现:生产者消费者问题

public class ProducerConsumer {
    private static Factory factory = new Factory();

    public static void main(String[] args) {
        //线程池执行器
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(new ProducerTask());
        executor.execute(new ConsumerTask());
        executor.shutdown();
    }

    //生产
    private static class ProducerTask implements Runnable {

        @Override
        public void run() {
            try {
                int i = 1;
                while (true) {
                    factory.write(i++);
                    Thread.sleep((int) (Math.random() * 2000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //消费
    private static class ConsumerTask implements Runnable {

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("          消费产品序号:" + factory.read());
                    Thread.sleep((int) (Math.random() * 4000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //工厂
    private static class Factory {
        private static final int CAPACITY = 2;//工厂容量
        private LinkedList<Integer> list = new LinkedList<>();
        private static Lock lock = new ReentrantLock();

        //线程间协作,notFull为真才能继续生产,notEmpty为真才能继续消费
        private static Condition notFull = lock.newCondition();
        private static Condition notEmpty = lock.newCondition();

        //生产
        public void write(int value) {
            lock.lock();
            try {
                while (list.size() == CAPACITY) {
                    System.out.println("工厂容量已满,等待消费");
                    notFull.await();
                }
                list.offer(value);//跟add()一样添加一个元素,容量满不会抛出异常,会返回false
                System.out.println("产品序号 " + value + " 生产完毕");
                notEmpty.signal();//唤醒消费者消费
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        //消费
        public int read() {
            int value = 0;
            lock.lock();
            try {
                while (list.isEmpty()) {
                    System.out.println("          工厂容量为空,等待生产");
                    notEmpty.await();
                }
                value = list.remove();
                notFull.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                return value;
            }
        }
    }
}
3.阻塞队列

阻塞队列BlockingQueue(接口),继承了Queue,如果满队列时添加或者空队列时删除都导致线程阻塞,其接口的实现有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

阻塞队列的方法

方法 功能
put():void 添加一个元素,如果队列满,则阻塞
take():Object 移除并返问队列头部的元素,如果队列空,则阻塞

普通队列

方法 功能
add():boolean 添加,如果满,则抛出异常
remove():Object 删除并返回,如果空,则抛出异常
element():Object 返回,如果空,则抛出异常
offer():boolean 添加,如果满,则返回null
poll():Object 删除并返回,如果空,则返回null
peek():Object 返回,如果空,则返回null

不需要自己写生产消费方法,ArrayBlockingQueue已经实现了。

public class ProducerConsumerBlockingQueue {
    private static ArrayBlockingQueue<Integer> factory =
            new ArrayBlockingQueue<>(2);//队列容量2

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(new ProducerTask());
        executor.execute(new ConsumerTask());
        executor.shutdown();
    }

    //生产
    private static class ProducerTask implements Runnable {

        @Override
        public void run() {
            try {
                int i = 1;
                while (true) {
                    factory.put(i++);
                    System.out.println("产品序号 " + (i-1) + " 生产完毕");
                    Thread.sleep((int) (Math.random() * 2000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //消费
    private static class ConsumerTask implements Runnable {

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("          消费产品序号:" + factory.take());
                    Thread.sleep((int) (Math.random() * 3000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}