欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

线程通信和线程池

程序员文章站 2022-06-06 07:55:13
...

生产者与消费者设计模式

设计模式原理

它描述的是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者可以从仓库中取走产品,解决生产者/消费者问题,我们需要采用某种机制保护生产者和消费者之间的同步

同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性,常用的方法就是加锁,保证资源在任意时刻只被一个线程访问

案例一:

你和你朋友公用一张银行卡,你向卡中存钱,你朋友取钱,保证你存一笔,然后取一笔,再存一笔,再取一笔。
实现功能:使用线程通信
在jdk1.5之前有三个方法实现线程通信:

wait(): 等待,线程执行这个方法进入等待队列(和锁有关,一个锁对应一个等待队列), 需要被唤醒

notify(): 通知唤醒,从等待队列中随机唤醒一个线程

notifyAll():全部唤醒,把等待队列中所有的线程都唤醒

代码实现:
银行卡类:

public class BankCard {
    private int money;
    private boolean flag;

    public synchronized  void  Cun(){
        while (flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        money+=1000;
        System.out.println(Thread.currentThread().getName()+"存了1000元");
        flag=true;
        this.notifyAll();
    }

    public synchronized void qu(){
        while (!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        money-=1000;
        System.out.println(Thread.currentThread().getName()+"取了1000元");
        flag=false;
        this.notifyAll();
    }
}

存钱线程:

public class AddMoney implements Runnable {
    private BankCard bankCard;

    public AddMoney(BankCard bankCard) {
        this.bankCard = bankCard;
    }

    public BankCard getBankCard() {
        return bankCard;
    }

    public void setBankCard(BankCard bankCard) {
        this.bankCard = bankCard;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            bankCard.Cun();
        }
    }
}

取钱线程:

public class RemoveMoney implements Runnable {
    private BankCard bankCard;

    public RemoveMoney(BankCard bankCard) {
        this.bankCard = bankCard;
    }

    public BankCard getBankCard() {
        return bankCard;
    }

    public void setBankCard(BankCard bankCard) {
        this.bankCard = bankCard;
    }

    @Override
    public void run() {
        for (int i = 0; i <10 ; i++) {
            bankCard.qu();
        }
    }
}

测试类:

public class Demo1 {
    public static void main(String[] args) {
        BankCard bankCard=new BankCard();
        AddMoney add=new AddMoney(bankCard);
        RemoveMoney rm=new RemoveMoney(bankCard);
        Thread t1=new Thread(add);
        Thread t2=new Thread(rm);
        t1.start();
        t2.start();
    }
}

案例二

wait():当缓冲区已满或空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行
·是Object的方法
·调用方式:对象.wait();
·表示释放 对象 这个锁标记,然后在锁外边等待(对比sleep(),sleep是抱着锁休眠的)
·等待,必须放到同步代码段中执行

notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态
·是Object的方法
·调用方式:对象.notify();
·表示唤醒 对象 所标记外边在等待的一个线程

notifyAll():全部唤醒
·是Object的方法
·调用方式:对象.notifyAll()
·表示唤醒 对象 所标记外边等待的所有线程

在麦当劳快餐店,有一种汉堡包在制作出来后会放在一个容器中,假如这个容器最多能装5个汉堡包,顾客购买该汉堡包食用时,容器中就会减少,同时麦当劳店会生产该种汉堡包。
实现代码:
面包类:

public class Bread {
    private String brand;
    private int id;

    public String getBrand() {
        return brand;
    }

    public void setBrand(String brand) {
        this.brand = brand;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Bread{" +
                "brand='" + brand + '\'' +
                ", id=" + id +
                '}';
    }

    public Bread(String brand, int id) {
        this.brand = brand;
        this.id = id;
    }

    public Bread() {
    }
}

面包容器类

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BreadStore {
    private List<Bread> list = new ArrayList<>();
    private boolean flag;
    int index = 0;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Condition condition1 = lock.newCondition();

    public  void pro(Bread bread) {
        lock.lock();
        try {
            while (index > 5) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "生产了" + bread.getId());
            list.add(bread);
            index++;
            condition1.signal();
        } finally {
            lock.unlock();
        }

    }

    public  void Consume(Bread bread) {
        lock.lock();
        try {
            while (index <1) {
                try {
                    condition1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "消费了"+bread.getId());
            list.remove(bread);
            index--;
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

生产者类:

public class Producer implements Runnable {
    private BreadStore breadStore;

    public Producer(BreadStore breadStore) {
        this.breadStore = breadStore;
    }

    @Override
    public void run() {
        for(int i=0;i<30;i++){
            breadStore.pro(new Bread("旺仔",i));
        }
    }
}

消费者类:

public class Consummer implements Runnable {
   private BreadStore breadStore;

    public Consummer(BreadStore breadStore) {
        this.breadStore = breadStore;
    }
    @Override
    public void run() {
        for(int i=0;i<30;i++){
            breadStore.Consume(new Bread("旺仔",i));
        }
    }
}

测试类:

public class Test1 {
    public static void main(String[] args) {
        BreadStore breadStore=new BreadStore();
        Consummer cs=new Consummer(breadStore);
        Producer p=new Producer(breadStore);
        Consummer c1=new Consummer(breadStore);
        Thread t1=new Thread(cs);
        Thread t2=new Thread(p);
        Thread t3=new Thread(c1);
        t3.start();
        t1.start();
        t2.start();
    }
}

其实消费者和生产者模式重点在于如何控制线程的等待和唤醒。

扩展:线程池

为什么需要线程池:
例如有非常的多的任务需要多线程来完成,且每个线程执行时间不会太长,这样会频繁的创建和消耗线程。频繁创建和销毁线程会比较耗性能。如果有了线程池就不要创建更多的线程来完成任务,因为线程可以重用。

线程池维护着一个队列,队列中保存着处于等待(空闲)状态的线程。不用每次都创建新的线程。

并发:多个线程同时执行,并发的线程有资源共享一定需要同步。
并行:真正的并发。多核cpu时,每个内核执行一个线程。

接口

1.Executor:线程池的核心接口,负责线程的创建使用和调度的根接口。
2 ExecutorService:Executor的子接口,线程池的主要接口,提供基本的功能。
3 ScheduledExecutorService:ExecutorService的子接口,负责线程调度的子接口。

实现类

1.ThreadPoolExecutor:ExecutorService的实现类,负责线程池的创建使用。
2. ScheduledThreadPoolExcutor:继承ThreadPoolExecutor,并实现ScheduleExecutorService的接口,既有线程池的功能,又具有线程调度功能。
3 .Executors:线程池的工具类,负责线程池的创建。

  • newFixedThreadPool():创建固定大小的线程池。
  • newCachedThreadPool():创建缓冲线程池,线程池的大小没有限制。根据需求自动调整线程数量。
  • newScheduledExecutorService:ExecutorService的子接口,负责线程调度子接口。
  • newSingleThreadExecutor();创建单个线程的线程池,只有一个线程。

案例一:使用线程池实现卖票

public class Ticket implements Runnable{
	private int ticket=100;

	@Override
	public void run() {
		while(true) {
			if(ticket<=0) {
				break;
			}
			System.out.println(Thread.currentThread().getName()+"卖第"+ticket+"张票");
			ticket--;
		}
	}
	
}
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
	public static void main(String[] args) {
		Ticket ticket=new Ticket();
		ExecutorService threadPool = Executors.newFixedThreadPool(4);
		for(int i=0;i<4;i++) {
			threadPool.submit(ticket);
		}
		threadPool.shutdown();
		System.out.println("主线程执行完毕........");
		
	}
}

案例二:线程池计算1-100的和,要求采用Callable接口

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test {
	public static void main(String[] args) throws Exception {
		ExecutorService threadPool = Executors.newFixedThreadPool(4);
		List<Future<Integer>> list=new ArrayList<>();
		for (int i = 0; i < 10; i++) {
			Future<Integer> future = threadPool.submit(new Callable<Integer>() {

				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for (int i = 0; i <= 100; i++) {
						Thread.sleep(10);
						sum += i;
					}
					System.out.println(Thread.currentThread().getName() + "计算完毕");
					return sum;
				}
			});
			list.add(future);
		}

		threadPool.shutdown();
		System.out.println("主线程结束了。。。。");
		for (Future<Integer> fu : list) {
			int s = fu.get();
			System.out.println(s);
		}
		

	}
}

案例三:延迟执行任务

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test2 {
	public static void main(String[] args) throws Exception{
		ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
		List<Future<Integer>> list=new ArrayList<>();
		for(int i=0;i<10;i++) {
			Future<Integer> future=threadPool.schedule(new Callable<Integer>() {
	
				@Override
				public Integer call() throws Exception {
					int ran=new Random().nextInt(100);
					System.out.println(Thread.currentThread().getName()+"...."+ran);
					return ran;
				}
			},3,TimeUnit.SECONDS);
			list.add(future);
		}
		threadPool.shutdown();
		System.out.println("主线程结束了...........");
		for (Future<Integer> future2 : list) {
			int n=future2.get();
			System.out.println(n);
		}
		
	}
}