欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java 多线程-07 JUC-03 ReadWriteLock和BlockingQueue、CopyOnWriteArrayList等JUC集合

程序员文章站 2022-05-04 20:49:42
...

上接: Java 多线程-06 JUC-02 CountDownLatch、CyclicBarrier、Semaphore、Callable的使用


一、ReadWriteLock

如果我们希望在对数据操作的时候,写入操作仅供一个线程独占,而读取操作可以被多个线程共享的时候,可以使用读写锁ReadWriteLock

ReadWriteLock为一个接口,仅提供获取读锁和写锁两个方法:

public interface ReadWriteLock {
	Lock readLock();
	Lock writeLock();
}

常用实现类ReentrantReadWriteLock

ReentrantReadWriteLock只允许多个读线程同时访问,不允许读和写,写和写同时访问。

ReentrantReadWriteLock支持公平锁和非公平锁,默认非公平

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

ReentrantReadWriteLock为可重入锁,即持锁状态下可再次获取锁,或是说外层方法获取锁后,内层方法也可获取锁。

代码例子
现对一个缓存TestCache进行并发操作,写入要求为线程独占,读入可多个读线程共享。

class TestCache {
    private volatile Map<String, String> map = new HashMap<>();

    private ReadWriteLock lock = new ReentrantReadWriteLock();
	//写入
    public void add(String key, String value) {
        try {
            lock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + " 准备写");
            map.put(key,value);
            System.out.println(Thread.currentThread().getName() + " 写入完成");
        } catch (Exception ignored) {}
        finally {
            lock.writeLock().unlock();
        }
    }
	//读取
    public void get(String key) {	
        try {
            lock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + " 准备读");
            map.get(key);
            System.out.println(Thread.currentThread().getName() + " 读取完成");
        } catch (Exception ignored) {}
        finally {
            lock.readLock().unlock();
        }
    }
}

Main

public static void main(String[] args) {
    TestCache testCache = new TestCache();
	//5个线程对该对象写入
    for (int i = 0; i < 5; i++) {
        final int temp = i;
        new Thread(()->{
            testCache.add(String.valueOf(temp), UUID.randomUUID().toString());
        }, String.valueOf(i)).start();
    }
	//5个线程对该对象读取
    for (int i = 0; i < 5; i++) {
        final int temp = i;
        new Thread(()->{
            testCache.get(String.valueOf(temp));
        },String.valueOf(i)).start();
    }
}

输出

0 准备读
4 准备读
4 读取完成
2 准备读
2 读取完成
3 准备读
1 准备读
3 读取完成
0 读取完成
1 读取完成
# 可以看出,读取操作允许其他线程插入,而写入操作是线程独占的
1 准备写
1 写入完成
4 准备写
4 写入完成
0 准备写
0 写入完成
3 准备写
3 写入完成
2 准备写
2 写入完成

Process finished with exit code 0

二、JUC中常见线程安全集合

由于传统List、Set、Map集合部分线程不安全,JUC提供了一系列线程安全集合使用。
这里粗略介绍:
CopyOnWriteArrayListCopyOnWriteArraySetConcurrentHashMapBlockingQueueSynchronousQueue


1. CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap
这三个集合类对应ArrayList、HashSet、HashMap,使用方式相同:

CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.get(0);
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("abc");
set.remove("abc");
set.contains("abc");
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("key", "value");
map.get("key");
map.remove("key");

对于CopyOnWriteArrayList、CopyOnWriteArraySet中CopyOnWrite意为在写入时复制这种扩容方式,如:

//CopyOnWriteArrayList<E>->add(E e)
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); //加锁,可重入、排他锁
    try {
        Object[] elements = getArray(); //获取原数组元素
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1); //复制,扩容
        newElements[len] = e; //设置新元素
        setArray(newElements);
        return true;
    } finally {
        lock.unlock(); //解锁
    }
}

为何要在扩容时复制一遍呢?

这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。

而CopyOnWriteArraySet中不同于HashSet,其维护了一个CopyOnWriteArrayList

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
        ...
		private final CopyOnWriteArrayList<E> al;
		...
}

add方法

//其add方法本质上调用了CopyOnWriteArrayList<E>->addIfAbsent(E e)
//只有在不存在时才添加元素,否则返回false
public boolean add(E e) {
    return al.addIfAbsent(e);
}

不同于以上两个,ConcurrentHashMap使用分段锁维持线程安全。

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁。
.
那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争。
.
先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
.
有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。 (不按顺序可能出现死锁)


2. BlockingQueue、SynchronousQueue
BlockingQueue意为阻塞队列,其维护了一个一定大小的队列,当队列满时新元素想要入队可以阻塞等待,同理当队列为空,希望元素出队也可阻塞等待。

以其实现类ArrayBlockingQueue来说
入队方法
阻塞队列的入队方法共有三个:

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
    queue.add(1); //若队列满则抛出异常IllegalStateException("Queue full")
    queue.offer(2); //队列满则返回false
    queue.offer(3, 2, TimeUnit.SECONDS); //队列满则等待2s,超时返回false
    queue.put(4); //队列满则阻塞,受外界中断抛出InterruptedException
}

出队方法
阻塞队列出队方法同样有三个:

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
    queue.remove(); //为空抛出异常NoSuchElementException();
    queue.poll(); //为空返回null
    queue.poll(2000, TimeUnit.MILLISECONDS); //为空等待2000ms, 超时返回null
    queue.take(); //为空阻塞等待, 意外中断抛出InterruptedException
}

查看首元素

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
    queue.element(); //为空抛出NoSuchElementException();
    queue.peek(); //为空返回null
}

BlockingQueue API 总结

方法 抛出异常 有返回值 超时等待 阻塞等待
添加 add() offer() offer(…) put()
移除 remove() poll() poll(…) take()
首元素 element() peek() \ \

BlockingQueue在线程池中被用于超出核心线程数的等待队列
Java 多线程-07 JUC-03 ReadWriteLock和BlockingQueue、CopyOnWriteArrayList等JUC集合
线程数超过corePoolSize就会放入该阻塞队列等待,如果超出阻塞队列,线程池则会扩容至maxiumPoolSize。详见 线程池


SynchronousQueue同步队列,需使用 put()take() 方法对其写入和读取。
SynchronousQueue内没有数据缓冲空间,在写入一个元素之后,之后的写入会阻塞,直到之前写入的元素被取走。
代码例子

public static void main(String[] args) throws InterruptedException {

    SynchronousQueue<Integer> queue = new SynchronousQueue<>();
    long start = System.currentTimeMillis(); //统计运行时间

	//开启线程,对该队列写入三次
    new Thread(()->{
        for (int i = 0; i < 3; i++) {
            try {
                System.out.println(i + " 入队");
                queue.put(i);
            } catch (InterruptedException ignored) {}
        }
    }).start();
    
	//每隔2s,出队一次,执行三次
    for (int i = 0; i < 3; i++) {
        TimeUnit.SECONDS.sleep(2);
        int res = queue.take();
        System.out.println(res + " 出队  "
                + "运行时间: " + String.valueOf(System.currentTimeMillis() - start));
        System.out.println("----------------");
    }
}

输出

0 入队
0 出队  运行时间: 2043
----------------
1 入队
1 出队  运行时间: 4043
----------------
2 入队
2 出队  运行时间: 6044
----------------

Process finished with exit code 0

如有错误,恳请纠正
下接: