Java 多线程-07 JUC-03 ReadWriteLock和BlockingQueue、CopyOnWriteArrayList等JUC集合
上接: Java 多线程-06 JUC-02 CountDownLatch、CyclicBarrier、Semaphore、Callable的使用
一、ReadWriteLock
如果我们希望在对数据操作的时候,写入操作仅供一个线程独占,而读取操作可以被多个线程共享的时候,可以使用读写锁ReadWriteLock。
ReadWriteLock为一个接口,仅提供获取读锁和写锁两个方法:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
常用实现类ReentrantReadWriteLock
ReentrantReadWriteLock只允许多个读线程同时访问,不允许读和写,写和写同时访问。
ReentrantReadWriteLock支持公平锁和非公平锁,默认非公平
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock为可重入锁,即持锁状态下可再次获取锁,或是说外层方法获取锁后,内层方法也可获取锁。
代码例子
现对一个缓存TestCache进行并发操作,写入要求为线程独占,读入可多个读线程共享。
class TestCache {
private volatile Map<String, String> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
//写入
public void add(String key, String value) {
try {
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + " 准备写");
map.put(key,value);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} catch (Exception ignored) {}
finally {
lock.writeLock().unlock();
}
}
//读取
public void get(String key) {
try {
lock.readLock().lock();
System.out.println(Thread.currentThread().getName() + " 准备读");
map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成");
} catch (Exception ignored) {}
finally {
lock.readLock().unlock();
}
}
}
Main
public static void main(String[] args) {
TestCache testCache = new TestCache();
//5个线程对该对象写入
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
testCache.add(String.valueOf(temp), UUID.randomUUID().toString());
}, String.valueOf(i)).start();
}
//5个线程对该对象读取
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
testCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
输出
0 准备读
4 准备读
4 读取完成
2 准备读
2 读取完成
3 准备读
1 准备读
3 读取完成
0 读取完成
1 读取完成
# 可以看出,读取操作允许其他线程插入,而写入操作是线程独占的
1 准备写
1 写入完成
4 准备写
4 写入完成
0 准备写
0 写入完成
3 准备写
3 写入完成
2 准备写
2 写入完成
Process finished with exit code 0
二、JUC中常见线程安全集合
由于传统List、Set、Map集合部分线程不安全,JUC提供了一系列线程安全集合使用。
这里粗略介绍:
CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap、BlockingQueue、SynchronousQueue…
1. CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap
这三个集合类对应ArrayList、HashSet、HashMap,使用方式相同:
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.get(0);
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("abc");
set.remove("abc");
set.contains("abc");
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("key", "value");
map.get("key");
map.remove("key");
对于CopyOnWriteArrayList、CopyOnWriteArraySet中CopyOnWrite意为在写入时复制这种扩容方式,如:
//CopyOnWriteArrayList<E>->add(E e)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁,可重入、排他锁
try {
Object[] elements = getArray(); //获取原数组元素
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //复制,扩容
newElements[len] = e; //设置新元素
setArray(newElements);
return true;
} finally {
lock.unlock(); //解锁
}
}
为何要在扩容时复制一遍呢?
这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。
而CopyOnWriteArraySet中不同于HashSet,其维护了一个CopyOnWriteArrayList
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
...
private final CopyOnWriteArrayList<E> al;
...
}
add方法
//其add方法本质上调用了CopyOnWriteArrayList<E>->addIfAbsent(E e)
//只有在不存在时才添加元素,否则返回false
public boolean add(E e) {
return al.addIfAbsent(e);
}
不同于以上两个,ConcurrentHashMap使用分段锁维持线程安全。
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁。
.
那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争。
.
先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
.
有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。 (不按顺序可能出现死锁)
2. BlockingQueue、SynchronousQueue
BlockingQueue意为阻塞队列,其维护了一个一定大小的队列,当队列满时新元素想要入队可以阻塞等待,同理当队列为空,希望元素出队也可阻塞等待。
以其实现类ArrayBlockingQueue来说
入队方法
阻塞队列的入队方法共有三个:
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
queue.add(1); //若队列满则抛出异常IllegalStateException("Queue full")
queue.offer(2); //队列满则返回false
queue.offer(3, 2, TimeUnit.SECONDS); //队列满则等待2s,超时返回false
queue.put(4); //队列满则阻塞,受外界中断抛出InterruptedException
}
出队方法
阻塞队列出队方法同样有三个:
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
queue.remove(); //为空抛出异常NoSuchElementException();
queue.poll(); //为空返回null
queue.poll(2000, TimeUnit.MILLISECONDS); //为空等待2000ms, 超时返回null
queue.take(); //为空阻塞等待, 意外中断抛出InterruptedException
}
查看首元素
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
queue.element(); //为空抛出NoSuchElementException();
queue.peek(); //为空返回null
}
BlockingQueue API 总结
方法 | 抛出异常 | 有返回值 | 超时等待 | 阻塞等待 |
---|---|---|---|---|
添加 | add() | offer() | offer(…) | put() |
移除 | remove() | poll() | poll(…) | take() |
首元素 | element() | peek() | \ | \ |
BlockingQueue在线程池中被用于超出核心线程数的等待队列
线程数超过corePoolSize就会放入该阻塞队列等待,如果超出阻塞队列,线程池则会扩容至maxiumPoolSize。详见 线程池
SynchronousQueue同步队列,需使用 put() 和 take() 方法对其写入和读取。
SynchronousQueue内没有数据缓冲空间,在写入一个元素之后,之后的写入会阻塞,直到之前写入的元素被取走。
代码例子
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
long start = System.currentTimeMillis(); //统计运行时间
//开启线程,对该队列写入三次
new Thread(()->{
for (int i = 0; i < 3; i++) {
try {
System.out.println(i + " 入队");
queue.put(i);
} catch (InterruptedException ignored) {}
}
}).start();
//每隔2s,出队一次,执行三次
for (int i = 0; i < 3; i++) {
TimeUnit.SECONDS.sleep(2);
int res = queue.take();
System.out.println(res + " 出队 "
+ "运行时间: " + String.valueOf(System.currentTimeMillis() - start));
System.out.println("----------------");
}
}
输出
0 入队
0 出队 运行时间: 2043
----------------
1 入队
1 出队 运行时间: 4043
----------------
2 入队
2 出队 运行时间: 6044
----------------
Process finished with exit code 0
如有错误,恳请纠正
下接:
上一篇: 程序员和青蛙公主的故事
下一篇: 基于java的SHA1加密算法