Java多线程编程中的两种常用并发容器讲解
concurrenthashmap并发容器
concurrenthashmap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个concurrenthashmap加锁。
concurrenthashmap的内部结构
concurrenthashmap为了提高本身的并发能力,在内部采用了一个叫做segment的结构,一个segment其实就是一个类hash table的结构,segment内部维护了一个链表数组,我们用下面这一幅图来看下concurrenthashmap的内部结构:
从上面的结构我们可以了解到,concurrenthashmap定位一个元素的过程需要进行两次hash操作,第一次hash定位到segment,第二次hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是hash的过程要比普通的hashmap要长,但是带来的好处是写操作的时候可以只对元素所在的segment进行加锁即可,不会影响到其他的segment,这样,在最理想的情况下,concurrenthashmap可以最高同时支持segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的segment上),所以,通过这一种结构,concurrenthashmap的并发能力可以大大的提高。
segment
我们再来具体了解一下segment的数据结构:
static final class segment<k,v> extends reentrantlock implements serializable { transient volatile int count; transient int modcount; transient int threshold; transient volatile hashentry<k,v>[] table; final float loadfactor; }
详细解释一下segment里面的成员变量的意义:
- count:segment中元素的数量
- modcount:对table的大小造成影响的操作的数量(比如put或者remove操作)
- threshold:阈值,segment里面元素的数量超过这个值依旧就会对segment进行扩容
- table:链表数组,数组中的每一个元素代表了一个链表的头部
- loadfactor:负载因子,用于确定threshold
hashentry
segment中的元素是以hashentry的形式存放在链表数组中的,看一下hashentry的结构:
static final class hashentry<k,v> { final k key; final int hash; volatile v value; final hashentry<k,v> next; }
可以看到hashentry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现concurrentmodification的情况。
concurrenthashmap的初始化
下面我们来结合源代码来具体分析一下concurrenthashmap的实现,先看下初始化方法:
public concurrenthashmap(int initialcapacity, float loadfactor, int concurrencylevel) { if (!(loadfactor > 0) || initialcapacity < 0 || concurrencylevel <= 0) throw new illegalargumentexception(); if (concurrencylevel > max_segments) concurrencylevel = max_segments; // find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencylevel) { ++sshift; ssize <<= 1; } segmentshift = 32 - sshift; segmentmask = ssize - 1; this.segments = segment.newarray(ssize); if (initialcapacity > maximum_capacity) initialcapacity = maximum_capacity; int c = initialcapacity / ssize; if (c * ssize < initialcapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new segment<k,v>(cap, loadfactor); }
currenthashmap的初始化一共有三个参数,一个initialcapacity,表示初始的容量,一个loadfactor,表示负载参数,最后一个是concurrentlevel,代表concurrenthashmap内部的segment的数量,concurrentlevel一经指定,不可改变,后续如果concurrenthashmap的元素数量增加导致conrruenthashmap需要扩容,concurrenthashmap不会增加segment的数量,而只会增加segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个concurrenthashmap做rehash,而只需要对segment里面的元素做一次rehash就可以了。
整个concurrenthashmap的初始化方法还是非常简单的,先是根据concurrentlevel来new出segment,这里segment的数量是不大于concurrentlevel的最大的2的指数,就是说segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialcapacity确定segment的容量的大小,每一个segment的容量大小也是2的指数,同样使为了加快hash的过程。
这边需要特别注意一下两个变量,分别是segmentshift和segmentmask,这两个变量在后面将会起到很大的作用,假设构造函数确定了segment的数量是2的n次方,那么segmentshift就等于32减去n,而segmentmask就等于2的n次方减一。
concurrenthashmap的get操作
前面提到过concurrenthashmap的get操作是不用加锁的,我们这里看一下其实现:
public v get(object key) { int hash = hash(key.hashcode()); return segmentfor(hash).get(key, hash); }
看第三行,segmentfor这个函数用于确定操作应该在哪一个segment中进行,几乎对concurrenthashmap的所有操作都需要用到这个函数,我们看下这个函数的实现:
final segment<k,v> segmentfor(int hash) { return segments[(hash >>> segmentshift) & segmentmask]; }
这个函数用了位操作来确定segment,根据传入的hash值向右无符号右移segmentshift位,然后和segmentmask进行与操作,结合我们之前说的segmentshift和segmentmask的值,就可以得出以下结论:假设segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个segment中。
在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的segment的get方法:
v get(object key, int hash) { if (count != 0) { // read-volatile hashentry<k,v> e = getfirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { v v = e.value; if (v != null) return v; return readvalueunderlock(e); // recheck } e = e.next; } } return null; }
先看第二行代码,这里对count进行了一次判断,其中count表示segment中元素的数量,我们可以来看一下count的定义:
transient volatile int count;
可以看到count是volatile的,实际上这里里面利用了volatile的语义:
对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。
因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。
然后,在第三行,调用了getfirst()来取得链表的头部:
hashentry<k,v> getfirst(int hash) { hashentry<k,v>[] tab = table; return tab[hash & (tab.length - 1)]; }
同样,这里也是用位操作来确定链表的头部,hash值和hashtable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是hashtable的长度以2为底的结果。
在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。
concurrenthashmap的put操作
看完了get操作,再看下put操作,put操作的前面也是确定segment的过程,这里不再赘述,直接看关键的segment的put方法:
v put(k key, int hash, v value, boolean onlyifabsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); hashentry<k,v>[] tab = table; int index = hash & (tab.length - 1); hashentry<k,v> first = tab[index]; hashentry<k,v> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; v oldvalue; if (e != null) { oldvalue = e.value; if (!onlyifabsent) e.value = value; } else { oldvalue = null; ++modcount; tab[index] = new hashentry<k,v>(key, hash, first, value); count = c; // write-volatile } return oldvalue; } finally { unlock(); } }
首先对segment的put操作是加锁完成的,然后在第五行,如果segment中元素的数量超过了阈值(由构造函数中的loadfactor算出)这需要进行对segment扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。
第8和第9行的操作就是getfirst的过程,确定链表头部的位置。
第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果没有找到,则进入21行这里,生成一个新的hashentry并且把它加到整个segment的头部,然后再更新count的值。
concurrenthashmap的remove操作
remove操作的前面一部分和前面的get和put操作一样,都是定位segment的过程,然后再调用segment的remove方法:
v remove(object key, int hash, object value) { lock(); try { int c = count - 1; hashentry<k,v>[] tab = table; int index = hash & (tab.length - 1); hashentry<k,v> first = tab[index]; hashentry<k,v> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; v oldvalue = null; if (e != null) { v v = e.value; if (value == null || value.equals(v)) { oldvalue = v; // all entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modcount; hashentry<k,v> newfirst = e.next; for (hashentry<k,v> p = first; p != e; p = p.next) newfirst = new hashentry<k,v>(p.key, p.hash, newfirst, p.value); tab[index] = newfirst; count = c; // write-volatile } } return oldvalue; } finally { unlock(); } }
首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,我们之前已经说过hashentry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:
假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:
copyonwritearraylist并发容器
copy-on-write简称cow,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从jdk1.5开始java并发包里提供了两个使用copyonwrite机制实现的并发容器,它们是copyonwritearraylist和copyonwritearrayset。copyonwrite容器非常有用,可以在非常多的并发场景中使用到。
什么是copyonwrite容器
copyonwrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对copyonwrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以copyonwrite容器也是一种读写分离的思想,读和写不同的容器。
copyonwritearraylist的实现原理
在使用copyonwritearraylist之前,我们先阅读其源码了解下它是如何实现的。以下代码是向copyonwritearraylist中add方法的实现(向copyonwritearraylist里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会copy出n个副本出来。
/** * appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link collection#add}) */ public boolean add(e e) { final reentrantlock lock = this.lock; lock.lock(); try { object[] elements = getarray(); int len = elements.length; object[] newelements = arrays.copyof(elements, len + 1); newelements[len] = e; setarray(newelements); return true; } finally { lock.unlock(); } }
读的时候不需要加锁,如果读的时候有多个线程正在向copyonwritearraylist添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的copyonwritearraylist。
public e get(int index) { return get(getarray(), index); }
jdk中并没有提供copyonwritemap,我们可以参考copyonwritearraylist来实现一个,基本代码如下:
import java.util.collection; import java.util.map; import java.util.set; public class copyonwritemap<k, v> implements map<k, v>, cloneable { private volatile map<k, v> internalmap; public copyonwritemap() { internalmap = new hashmap<k, v>(); } public v put(k key, v value) { synchronized (this) { map<k, v> newmap = new hashmap<k, v>(internalmap); v val = newmap.put(key, value); internalmap = newmap; return val; } } public v get(object key) { return internalmap.get(key); } public void putall(map<? extends k, ? extends v> newdata) { synchronized (this) { map<k, v> newmap = new hashmap<k, v>(internalmap); newmap.putall(newdata); internalmap = newmap; } } }
实现很简单,只要了解了copyonwrite机制,我们可以实现各种copyonwrite容器,并且在不同的应用场景中使用。
copyonwrite的应用场景
copyonwrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:
package com.ifeve.book; import java.util.map; import com.ifeve.book.forkjoin.copyonwritemap; /** * 黑名单服务 * * @author fangtengfei * */ public class blacklistserviceimpl { private static copyonwritemap<string, boolean> blacklistmap = new copyonwritemap<string, boolean>( 1000); public static boolean isblacklist(string id) { return blacklistmap.get(id) == null ? false : true; } public static void addblacklist(string id) { blacklistmap.put(id, boolean.true); } /** * 批量添加黑名单 * * @param ids */ public static void addblacklist(map<string,boolean> ids) { blacklistmap.putall(ids); } }
代码很简单,但是使用copyonwritemap需要注意两件事情:
1. 减少扩容开销。根据实际需要,初始化copyonwritemap的大小,避免写时copyonwritemap扩容的开销。
2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addblacklist方法。
copyonwrite的缺点
copyonwrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
内存占用问题。因为copyonwrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200m左右,那么再写入100m数据进去,内存就会占用300m,那么这个时候很有可能造成频繁的yong gc和full gc。之前我们系统中使用了一个服务由于每晚使用copyonwrite机制更新大对象,造成了每晚15秒的full gc,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用copyonwrite容器,而使用其他的并发容器,如concurrenthashmap。
数据一致性问题。copyonwrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用copyonwrite容器。