线程通信和线程池
生产者与消费者设计模式
设计模式原理
它描述的是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者可以从仓库中取走产品,解决生产者/消费者问题,我们需要采用某种机制保护生产者和消费者之间的同步
同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性,常用的方法就是加锁,保证资源在任意时刻只被一个线程访问
案例一:
你和你朋友公用一张银行卡,你向卡中存钱,你朋友取钱,保证你存一笔,然后取一笔,再存一笔,再取一笔。
实现功能:使用线程通信
在jdk1.5之前有三个方法实现线程通信:
wait(): 等待,线程执行这个方法进入等待队列(和锁有关,一个锁对应一个等待队列), 需要被唤醒
notify(): 通知唤醒,从等待队列中随机唤醒一个线程
notifyAll():全部唤醒,把等待队列中所有的线程都唤醒
代码实现:
银行卡类:
public class BankCard {
private int money;
private boolean flag;
public synchronized void Cun(){
while (flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
money+=1000;
System.out.println(Thread.currentThread().getName()+"存了1000元");
flag=true;
this.notifyAll();
}
public synchronized void qu(){
while (!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
money-=1000;
System.out.println(Thread.currentThread().getName()+"取了1000元");
flag=false;
this.notifyAll();
}
}
存钱线程:
public class AddMoney implements Runnable {
private BankCard bankCard;
public AddMoney(BankCard bankCard) {
this.bankCard = bankCard;
}
public BankCard getBankCard() {
return bankCard;
}
public void setBankCard(BankCard bankCard) {
this.bankCard = bankCard;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
bankCard.Cun();
}
}
}
取钱线程:
public class RemoveMoney implements Runnable {
private BankCard bankCard;
public RemoveMoney(BankCard bankCard) {
this.bankCard = bankCard;
}
public BankCard getBankCard() {
return bankCard;
}
public void setBankCard(BankCard bankCard) {
this.bankCard = bankCard;
}
@Override
public void run() {
for (int i = 0; i <10 ; i++) {
bankCard.qu();
}
}
}
测试类:
public class Demo1 {
public static void main(String[] args) {
BankCard bankCard=new BankCard();
AddMoney add=new AddMoney(bankCard);
RemoveMoney rm=new RemoveMoney(bankCard);
Thread t1=new Thread(add);
Thread t2=new Thread(rm);
t1.start();
t2.start();
}
}
案例二
wait():当缓冲区已满或空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行
·是Object的方法
·调用方式:对象.wait();
·表示释放 对象 这个锁标记,然后在锁外边等待(对比sleep(),sleep是抱着锁休眠的)
·等待,必须放到同步代码段中执行
notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态
·是Object的方法
·调用方式:对象.notify();
·表示唤醒 对象 所标记外边在等待的一个线程
notifyAll():全部唤醒
·是Object的方法
·调用方式:对象.notifyAll()
·表示唤醒 对象 所标记外边等待的所有线程
在麦当劳快餐店,有一种汉堡包在制作出来后会放在一个容器中,假如这个容器最多能装5个汉堡包,顾客购买该汉堡包食用时,容器中就会减少,同时麦当劳店会生产该种汉堡包。
实现代码:
面包类:
public class Bread {
private String brand;
private int id;
public String getBrand() {
return brand;
}
public void setBrand(String brand) {
this.brand = brand;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "Bread{" +
"brand='" + brand + '\'' +
", id=" + id +
'}';
}
public Bread(String brand, int id) {
this.brand = brand;
this.id = id;
}
public Bread() {
}
}
面包容器类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BreadStore {
private List<Bread> list = new ArrayList<>();
private boolean flag;
int index = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Condition condition1 = lock.newCondition();
public void pro(Bread bread) {
lock.lock();
try {
while (index > 5) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "生产了" + bread.getId());
list.add(bread);
index++;
condition1.signal();
} finally {
lock.unlock();
}
}
public void Consume(Bread bread) {
lock.lock();
try {
while (index <1) {
try {
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "消费了"+bread.getId());
list.remove(bread);
index--;
condition.signal();
} finally {
lock.unlock();
}
}
}
生产者类:
public class Producer implements Runnable {
private BreadStore breadStore;
public Producer(BreadStore breadStore) {
this.breadStore = breadStore;
}
@Override
public void run() {
for(int i=0;i<30;i++){
breadStore.pro(new Bread("旺仔",i));
}
}
}
消费者类:
public class Consummer implements Runnable {
private BreadStore breadStore;
public Consummer(BreadStore breadStore) {
this.breadStore = breadStore;
}
@Override
public void run() {
for(int i=0;i<30;i++){
breadStore.Consume(new Bread("旺仔",i));
}
}
}
测试类:
public class Test1 {
public static void main(String[] args) {
BreadStore breadStore=new BreadStore();
Consummer cs=new Consummer(breadStore);
Producer p=new Producer(breadStore);
Consummer c1=new Consummer(breadStore);
Thread t1=new Thread(cs);
Thread t2=new Thread(p);
Thread t3=new Thread(c1);
t3.start();
t1.start();
t2.start();
}
}
其实消费者和生产者模式重点在于如何控制线程的等待和唤醒。
扩展:线程池
为什么需要线程池:
例如有非常的多的任务需要多线程来完成,且每个线程执行时间不会太长,这样会频繁的创建和消耗线程。频繁创建和销毁线程会比较耗性能。如果有了线程池就不要创建更多的线程来完成任务,因为线程可以重用。
线程池维护着一个队列,队列中保存着处于等待(空闲)状态的线程。不用每次都创建新的线程。
并发:多个线程同时执行,并发的线程有资源共享一定需要同步。
并行:真正的并发。多核cpu时,每个内核执行一个线程。
接口
1.Executor:线程池的核心接口,负责线程的创建使用和调度的根接口。
2 ExecutorService:Executor的子接口,线程池的主要接口,提供基本的功能。
3 ScheduledExecutorService:ExecutorService的子接口,负责线程调度的子接口。
实现类
1.ThreadPoolExecutor:ExecutorService的实现类,负责线程池的创建使用。
2. ScheduledThreadPoolExcutor:继承ThreadPoolExecutor,并实现ScheduleExecutorService的接口,既有线程池的功能,又具有线程调度功能。
3 .Executors:线程池的工具类,负责线程池的创建。
- newFixedThreadPool():创建固定大小的线程池。
- newCachedThreadPool():创建缓冲线程池,线程池的大小没有限制。根据需求自动调整线程数量。
- newScheduledExecutorService:ExecutorService的子接口,负责线程调度子接口。
- newSingleThreadExecutor();创建单个线程的线程池,只有一个线程。
案例一:使用线程池实现卖票
public class Ticket implements Runnable{
private int ticket=100;
@Override
public void run() {
while(true) {
if(ticket<=0) {
break;
}
System.out.println(Thread.currentThread().getName()+"卖第"+ticket+"张票");
ticket--;
}
}
}
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
Ticket ticket=new Ticket();
ExecutorService threadPool = Executors.newFixedThreadPool(4);
for(int i=0;i<4;i++) {
threadPool.submit(ticket);
}
threadPool.shutdown();
System.out.println("主线程执行完毕........");
}
}
案例二:线程池计算1-100的和,要求采用Callable接口
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test {
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newFixedThreadPool(4);
List<Future<Integer>> list=new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
Thread.sleep(10);
sum += i;
}
System.out.println(Thread.currentThread().getName() + "计算完毕");
return sum;
}
});
list.add(future);
}
threadPool.shutdown();
System.out.println("主线程结束了。。。。");
for (Future<Integer> fu : list) {
int s = fu.get();
System.out.println(s);
}
}
}
案例三:延迟执行任务
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) throws Exception{
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
List<Future<Integer>> list=new ArrayList<>();
for(int i=0;i<10;i++) {
Future<Integer> future=threadPool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int ran=new Random().nextInt(100);
System.out.println(Thread.currentThread().getName()+"...."+ran);
return ran;
}
},3,TimeUnit.SECONDS);
list.add(future);
}
threadPool.shutdown();
System.out.println("主线程结束了...........");
for (Future<Integer> future2 : list) {
int n=future2.get();
System.out.println(n);
}
}
}
上一篇: TCP通信
下一篇: 经典笑话20则(暴笑)
推荐阅读
-
简单易懂-线程池
-
Android的线程通信:消息机制原理(Message,Handler,MessageQueue,Looper),异步任务AsyncTask,使用JSON
-
Android 入门第十讲02-广播(广播概述,使用方法(系统广播,自定义广播,两个activity之间的交互和传值),EventBus使用方法,数据传递,线程切换,Android的系统广播大全)
-
关于Java中线程池与Executor原理详解
-
java 线程安全和不可变性
-
浅谈Spring @Async异步线程池用法总结
-
Java 线程池_动力节点Java学院整理
-
浅谈线程通信wait,notify作用
-
java线程池:获取运行线程数并控制线程启动速度的方法
-
C#线程 BeginInvoke和EndInvoke使用方法