ConcurrentHashMap的数据不一致问题
ConcurrentHashMap被认为是支持高并发、高吞吐量的线程安全一个HashMap实现,因此多线程开发中经常使用到,但是最近在开发中却遇到了数据不一致问题,给自己埋了个大坑,下面描述下问题:
首先是工作场景描述:有一个订单列表,每个订单又包含多种类型的任务,每个线程一次只能处理一种类型的任务(取所有订单的该类型的任务,进行批量处理,任务没有先后关系),某订单处理完毕后,修改订单状态。
代码如下:
public class TaskRunner implements Runnable{ //订单id列表 private final List<String> orderList; //订单与任务类型列表对应关系,key是订单id,value是任务列表 private final ConcurrentHashMap<String, List<String>> contentMap; //所有的任务类型 private final ConcurrentLinkedQueue<String> typeQueue; public TaskRunner(ConcurrentHashMap<String, List<String>> contentMap, ConcurrentLinkedQueue<String> type, List<String> orderList) { this.contentMap = contentMap; this.typeQueue = type; this.orderList = orderList; } @Override public void run() { while(true){ String type = typeQueue.poll(); if(type != null){ try { //do something to finish the task... Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } //任务处理完毕后,修改对应的订单列表 for(String order : orderList){ try{ if(contentMap.get(order) != null){ contentMap.get(order).remove(type); if(contentMap.get(order)!= null && contentMap.get(order).size() == 0){ //订单order处理完毕 contentMap.remove(order); } } }catch(Exception e){ e.printStackTrace(); } } } if(contentMap.size() > 0){ System.out.println(contentMap.size() +", queue size:"+typeQueue.size()); }else{ System.out.println("empty, queue size:"+typeQueue.size()); } } } }
代码逻辑很简单,就是当任务处理完毕后,从订单列表中将任务移除,最终期望的结果应该是:任务类型队列typeQueue为空,所有的订单与任务映射contentMap为空。
contentMap初始化:任务列表为一个ArrayList
ConcurrentHashMap<String, List<String>> contentMap = new ConcurrentHashMap<String, List<String>>(); List<String> types = new ArrayList<String>(); type.add("b01"); type.add("b02"); type.add("b03"); for(String zqgs : orderList){ contentMap.put(zqgs, types); }
启动2个线程跑TaskRunner
int tn = 2; ExecutorService service = Executors.newFixedThreadPool(tn); for(int i = 0; i < tn; i++){ service.execute(new TaskRunner(contentMap, typeQueue, orderList)); }
运行3-5次就会出现,任务类型队列typeQueue为空,但订单与任务映射contentMap提示还有若干订单没有完成,这是说不通的(推测是不同步造成的),于是乎对for循环做了个修改,如下(加了synchronized关键字)。
for(String order : orderList){ synchronized(this){ try{ if(contentMap.get(order) != null){ contentMap.get(order).remove(type); if(contentMap.get(order)!= null && contentMap.get(order).size() == 0){ contentMap.remove(order); } } }catch(Exception e){ e.printStackTrace(); } } }
再次执行,还是遇到了数据不一致。
于是乎上网查了下ConcurrentHashMap的一些实现原理,它利用了分段锁来提高并发性能,它支持完全并发的读以及一定程度并发的写,即写是分段锁,读操作不是加锁的。
ConcurrentHashMap包含若干段(Segment),每个段又有多个HashEntry,HashEntry存放我们put进去的数据,结构如下:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
啥?value是volatile的,在并发写的情况下就会出现不一致的情况啊,如果是synchronized的就没问题了。
可参考下面的连接,就能比较好的理解这两个关键字了:
volatile 与 synchronized 区别 和 volatile与synchronized关键字
于是乎修改对contentMap的初始化方法:
ConcurrentHashMap<String, List<String>> contentMap = new ConcurrentHashMap<String, List<String>>(); List<String> types = Collections.synchronizedList(new ArrayList<String>()); type.add("b01"); type.add("b02"); type.add("b03"); for(String zqgs : orderList){ contentMap.put(zqgs, types); }
这样,不管运行多少次,当任务类型队列typeQueue为空,订单与任务映射contentMap也变为空了。
注:至于为什么,for循环我加了synchronized关键字,依然不能保证数据的一致性,为啥呢?原因有点逗比,synchronized是加锁了,但锁住的是本线程的任务对象,和另一个线程没有关系啊!!!(如果锁对了,其实这个方法也是可行的)
还有改了之后,目前可能出现空指针异常。
推荐阅读
-
Linq to SQL 插入数据时的一个问题
-
2个自定义的PHP in_array 函数,解决大量数据判断in_array的效率问题
-
使用数据引用公式解决Excel文件的自动更新数据问题
-
Orcale 数据库客户端PL/SQL 中文乱码的问题解决方法
-
Python中循环后使用list.append()数据被覆盖问题的解决
-
解决Flask读取mysql数据库的中文乱码问题
-
python爬虫的数据库连接问题【推荐】
-
vue awesome swiper异步加载数据出现的bug问题
-
oracle数据库下五种解决ORA-04031错误问题的方法
-
PowerDesigner生成数据库时的列中文注释乱码问题的设置方法