欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

第二章 Basic Thread Synchronization (基础线程同步) 【下】

程序员文章站 2022-06-01 11:17:41
...
涉及的内容
  • 同步一个方法
  • 同步类中分配一个独立属性
  • 在同步代码中使用条件
  • 使用Lock锁定代码块
  • 同步数据的读写锁
  • 修改Lock公平模式
  • 在Lock中使用多条件

1、同步数据的读写锁

ReadWriteLock接口 和 ReentrantReadWriteLock类

例子:使用ReadWriteLock接口获取一个存储两个产品的价格

package com.jack;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PricesInfo {

	private double price1;
	private double price2;
	private ReadWriteLock lock;
	public PricesInfo() {
		super();
		this.price1 = 1.0;
		this.price2 = 2.0;
		lock = new ReentrantReadWriteLock();
	}
	
	public double getPrice1(){
		lock.readLock().lock();
		double value = price1;
		lock.readLock().unlock();
		return value;
	}
	
	public double getPrice2(){
		lock.readLock().lock();
		double value = price2;
		lock.readLock().unlock();
		return value;
	}
	
	public void setPrices(double price1, double price2){
		lock.writeLock().lock();
		this.price1 = price1;
		this.price2= price2;
		lock.writeLock().unlock();
	}
}

package com.jack;

public class Reader implements Runnable {

	private PricesInfo pricesInfo;
	
	public Reader(PricesInfo pricesInfo) {
		super();
		this.pricesInfo = pricesInfo;
	}

	@Override
	public void run() {
		for (int i=0; i< 10; i++){
			System.out.printf("%s:价格1 : %f\n", Thread.currentThread().getName()
					,pricesInfo.getPrice1());
			System.out.printf("%s:价格2:%f\n", Thread.currentThread().getName(),
					pricesInfo.getPrice2());
		}
	}

}

package com.jack;

public class Writer implements Runnable{

	
	private PricesInfo pricesInfo;
	
	public Writer(PricesInfo pricesInfo) {
		super();
		this.pricesInfo = pricesInfo;
	}

	@Override
	public void run() {
		for(int i=0; i<3; i++){
			System.out.printf("写:尝试修改价格.\n");
			pricesInfo.setPrices(Math.random()*10, Math.random()*8);
			System.out.printf("写:价格已经修改了.\n");
			try{
				Thread.sleep(2);
			} catch(InterruptedException e){
				e.printStackTrace();
			}
			
		}
			
	}

}

package com.jack;

public class Main {
	public static void main(String[] args){
		PricesInfo pricesInfo = new PricesInfo();
		Reader readers[] = new Reader[5];
		Thread threadsReader[] = new Thread[5];
		for (int i=0; i<5; i++){
			readers[i] = new Reader(pricesInfo);
			threadsReader[i] = new Thread(readers[i]);
		}
		
		Writer writer = new Writer(pricesInfo);
		Thread threadWriter = new Thread(writer);
		for(int i=0; i<5; i++){
			threadsReader[i].start();
		}
		threadWriter.start();
	}
}


日志:

第二章 Basic Thread Synchronization (基础线程同步) 【下】
总结:

  • 1、创建5个读线程和一个写线程,
  • 2、创建一个new ReentrantReadWriteLock() 读写锁。
  • 3、对于需要锁定部分进行锁住,这时候只有一个线程可以执行。最后释放锁
  • 4、写锁并不会限制读,所以出现脏读。

2、修改Lock公平模式

non-fair mode(非公平模式) : false  (选择执行线程没有规则)

fair mode(公平模式) :true  (选择规则等待时间最长)

例子:了解公平模式和非公平模式

package com.jack;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrintQueue {
	private final Lock queueLock = new ReentrantLock(true);
	
	public void printJob(Object document){
		queueLock.lock();
		try {
			Long duration = (long) (Math.random()*10000);
			System.out.printf(Thread.currentThread().getName()
					+ ":打印队列:打印一个工作持续时间 %s ",(duration/1000)
					+ " seconds \n");
			Thread.sleep(duration);
		} catch (InterruptedException e){
			e.printStackTrace();
		}finally{
			queueLock.unlock();
		}
		queueLock.lock();
		try {
			Long duration = (long) (Math.random()*10000);
			System.out.printf(Thread.currentThread().getName()
					+ ":打印队列:打印一个工作持续时间 %s ",(duration/1000)
					+ " seconds \n");
			Thread.sleep(duration);
		} catch (InterruptedException e){
			e.printStackTrace();
		}finally{
			queueLock.unlock();
		}
	}
}

package com.jack;

public class Main {
	public static void main(String[] args){
		PrintQueue printQueue = new PrintQueue();  
        Thread thread[] = new Thread[10];  
        for (int i=0; i<10; i++){  
            thread[i]= new Thread(new Job(printQueue), "线程  " + i);  
        }  
          
        for(int i=0; i<10; i++){  
            thread[i].start();  
            try{
            	Thread.sleep(100);
            } catch (InterruptedException e){
            	e.printStackTrace();
            }
        }  
	}
}

日志:

线程  0:去打印一个文档
线程  0:打印队列:打印一个工作持续时间 7 seconds 
 线程  1:去打印一个文档
线程  2:去打印一个文档
线程  3:去打印一个文档
线程  4:去打印一个文档
线程  5:去打印一个文档
线程  6:去打印一个文档
线程  7:去打印一个文档
线程  8:去打印一个文档
线程  9:去打印一个文档
线程  1:打印队列:打印一个工作持续时间 9 seconds 
 线程  2:打印队列:打印一个工作持续时间 4 seconds 
 线程  3:打印队列:打印一个工作持续时间 9 seconds 
 线程  4:打印队列:打印一个工作持续时间 8 seconds 
 线程  5:打印队列:打印一个工作持续时间 2 seconds 
 线程  6:打印队列:打印一个工作持续时间 9 seconds 
 线程  7:打印队列:打印一个工作持续时间 4 seconds 
 线程  8:打印队列:打印一个工作持续时间 5 seconds 
 线程  9:打印队列:打印一个工作持续时间 3 seconds 
 线程  0:打印队列:打印一个工作持续时间 4 seconds 
 线程  0: 这个文档已经打印了
线程  1:打印队列:打印一个工作持续时间 6 seconds 
 线程  1: 这个文档已经打印了
线程  2:打印队列:打印一个工作持续时间 3 seconds 
 线程  2: 这个文档已经打印了
线程  3:打印队列:打印一个工作持续时间 6 seconds 
 线程  3: 这个文档已经打印了
线程  4:打印队列:打印一个工作持续时间 5 seconds 
 线程  4: 这个文档已经打印了
线程  5:打印队列:打印一个工作持续时间 7 seconds 
 线程  5: 这个文档已经打印了
线程  6:打印队列:打印一个工作持续时间 4 seconds 
 线程  6: 这个文档已经打印了
线程  7:打印队列:打印一个工作持续时间 3 seconds 
 线程  7: 这个文档已经打印了
线程  8:打印队列:打印一个工作持续时间 4 seconds 
 线程  8: 这个文档已经打印了
线程  9:打印队列:打印一个工作持续时间 7 seconds 
 线程  9: 这个文档已经打印了


总结:

  • 1、公平模式下,当线程0放弃一个锁的时候,线程1获取线程0放弃的锁,线程0继续等待(这个类似,打牌一样,轮着出牌,线程0要等到下一轮)
  • 2、非公平模式就是全凭CPU大人指派了。
  • 3、简单来说,公平:*国家,非公平:*国家


3、在Lock中使用多条件(Condtion接口)

唤醒线程的boolean值

生产者-消费者

package com.jack;

public class FileMock {

	private String content[];
	private int index;
	
	public FileMock(int size, int length){
		content = new String[size];
		for(int i=0; i<size; i++){
			StringBuilder buffer = new StringBuilder(length);
			for(int j=0; j<length; j++){
				int indice = (int)Math.random()*255;
				buffer.append((char)indice);
			}
			content[i] = buffer.toString();
		}
		index = 0;
	}
	
	public boolean hashMoreLines(){
		return index < content.length;
	}
	
	public String getLine(){
		if(this.hashMoreLines()) {
			System.out.printf("Mock: ", (content.length-index));
			return content[index++];
		}
		return null;
	}
}
package com.jack;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {

	private LinkedList<String> buffer;
	private int maxSize;
	private ReentrantLock lock;
	private Condition lines;
	private Condition space;
	//判断当前行是否在buffer中
	private boolean pendingLines;
	
	public Buffer(int maxSize) {
		this.maxSize = maxSize;
		buffer = new LinkedList<>();
		lock = new ReentrantLock();
		lines = lock.newCondition();
		space = lock.newCondition();
		pendingLines = true;
	}
	
	/**
	 * 插入一行
	 * @param line
	 */
	public void insert(String line){
		lock.lock();
		try{
			while(buffer.size() == maxSize){
				space.await();
			}
			buffer.offer(line);
			System.out.printf("%s: 插入行: %d\n", Thread.currentThread().getName(), buffer.size());
			//唤醒所有线程
			lines.signalAll();
		} catch (InterruptedException e){
			e.printStackTrace();
		}finally {
			lock.unlock();
		}
	}
	
	/**
	 * 获取一行
	 * @return
	 */
	public String get(){
		String line = null;
				lock.lock();
		try{
			while ((buffer.size()==0) && (hasPendingLines())){
				lines.await();
			}
			if(hasPendingLines()){
				line = buffer.poll();
				System.out.printf("%s :行数已经读取:%d\n", Thread.currentThread().getName(), buffer.size());
				space.signalAll();
			}
		}catch (InterruptedException e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
		return line;
	}
	
	public void setPendingLines(boolean pendingLines){
		this.pendingLines = pendingLines;
	}
	
	public boolean hasPendingLines(){
		return pendingLines||buffer.size() >0;
	}
}

package com.jack;

public class Producer implements Runnable{

	private  FileMock mock;
	private Buffer buffer;
	
	public Producer(FileMock mock, Buffer buffer) {
		super();
		this.mock = mock;
		this.buffer = buffer;
	}

	@Override
	public void run() {
		buffer.setPendingLines(true);
		while (mock.hashMoreLines()){
			String line = mock.getLine();
			buffer.insert(line);
		}
		buffer.setPendingLines(false);
	}

}

package com.jack;

import java.util.Random;

public class Consumer implements Runnable{
	private Buffer buffer;

	public Consumer(Buffer buffer) {
		super();
		this.buffer = buffer;
	}
	@Override
	public void run() {
		while (buffer.hasPendingLines()){
			String line = buffer.get();
			processLine(line);
		}
	}
	private void processLine(String line) {
		try{
			Random random = new Random();
			Thread.sleep(random.nextInt(100));
		} catch (InterruptedException e){
			e.printStackTrace();
		}
		
	}

}
package com.jack;

public class Main {
	public static void main(String[] args){
		FileMock mock = new FileMock(100, 10);
		Buffer buffer = new Buffer(20);
		Producer producer = new Producer(mock, buffer);
		Thread threadProducer = new Thread(producer, "Producer");
		
		Consumer consumers[] = new Consumer[3];
		Thread threadConsumers[] = new Thread[3];
		
		for (int i=0; i<3; i++){
			consumers[i] = new Consumer(buffer);
			threadConsumers[i] = new Thread(consumers[i], "Consumer "+i);
		}
		threadProducer.start();
		for(int i=0; i<3; i++){
			threadConsumers[i].start();
		}
	}
}

日志:部分日志

MOck: Producer: 插入行: 1
MOck: Producer: 插入行: 2
MOck: Consumer 0 :行数已经读取:1
Consumer 1 :行数已经读取:0
Producer: 插入行: 1
MOck: Producer: 插入行: 2
MOck: Consumer 2 :行数已经读取:1
Producer: 插入行: 2
MOck: Producer: 插入行: 3
MOck: Producer: 插入行: 4
MOck: Producer: 插入行: 5
MOck: Producer: 插入行: 6
MOck: Producer: 插入行: 7
MOck: Producer: 插入行: 8
MOck: Producer: 插入行: 9
MOck: Producer: 插入行: 10
MOck: Producer: 插入行: 11
MOck: Producer: 插入行: 12
MOck: Producer: 插入行: 13
MOck: Producer: 插入行: 14
MOck: Producer: 插入行: 15
MOck: Producer: 插入行: 16
MOck: Producer: 插入行: 17
MOck: Producer: 插入行: 18
MOck: Producer: 插入行: 19
MOck: Consumer 1 :行数已经读取:18
Producer: 插入行: 19
MOck: Producer: 插入行: 20
MOck: Consumer 1 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 1 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 0 :行数已经读取:19
Producer: 插入行: 20
MOck: Consumer 0 :行数已经读取:19


总结:

  • 1、采用space和lines都是两个条件来影响生产还是消费。
  • 2、这里用了Condition的sign()和signAll()方法来唤醒对方
  • 3、FileMock模拟生产文件。
  • 4、pendingLines判断buffer还有未读取的行。true表示有未读行。(表示还在生产)

第二章完。。。。。。。