欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ConcurrentHashMap的数据不一致问题

程序员文章站 2022-04-20 14:46:28
...

 

       ConcurrentHashMap被认为是支持高并发、高吞吐量的线程安全一个HashMap实现,因此多线程开发中经常使用到,但是最近在开发中却遇到了数据不一致问题,给自己埋了个大坑,下面描述下问题:

首先是工作场景描述:有一个订单列表,每个订单又包含多种类型的任务,每个线程一次只能处理一种类型的任务(取所有订单的该类型的任务,进行批量处理,任务没有先后关系),某订单处理完毕后,修改订单状态。

代码如下:

 

public class TaskRunner implements Runnable{

  //订单id列表
  private final List<String> orderList;
  //订单与任务类型列表对应关系,key是订单id,value是任务列表
  private final ConcurrentHashMap<String, List<String>> contentMap;  
  //所有的任务类型
  private final ConcurrentLinkedQueue<String> typeQueue;
  public TaskRunner(ConcurrentHashMap<String, List<String>> contentMap, 
      ConcurrentLinkedQueue<String> type, 
      List<String> orderList) {
    this.contentMap = contentMap;
    this.typeQueue = type;
    this.orderList = orderList;
  }

  @Override
  public void run() {
    while(true){
      String type = typeQueue.poll();
      if(type != null){
        try {
          //do something to finish the task...
          Thread.sleep(300);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        //任务处理完毕后,修改对应的订单列表
        for(String order : orderList){
          try{
            if(contentMap.get(order) != null){
              contentMap.get(order).remove(type);
              if(contentMap.get(order)!= null &&
                  contentMap.get(order).size() == 0){
		//订单order处理完毕
                contentMap.remove(order);
              }
            }
          }catch(Exception e){
            e.printStackTrace();
          }
        }
      }
      
      if(contentMap.size() > 0){
        System.out.println(contentMap.size() +", queue size:"+typeQueue.size());
      }else{
        System.out.println("empty, queue size:"+typeQueue.size()); 
      }
    }
  }
}

  

        代码逻辑很简单,就是当任务处理完毕后,从订单列表中将任务移除,最终期望的结果应该是:任务类型队列typeQueue为空,所有的订单与任务映射contentMap为空。

contentMap初始化:任务列表为一个ArrayList

 

ConcurrentHashMap<String, List<String>> contentMap = new ConcurrentHashMap<String, List<String>>();
List<String> types = new ArrayList<String>();
  type.add("b01");
  type.add("b02");
  type.add("b03");
  for(String zqgs : orderList){
     contentMap.put(zqgs, types);
  }

         启动2个线程跑TaskRunner

int tn = 2;
ExecutorService service = Executors.newFixedThreadPool(tn);
for(int i = 0; i < tn; i++){
  service.execute(new TaskRunner(contentMap, typeQueue, orderList));
}

 

        运行3-5次就会出现,任务类型队列typeQueue为空,但订单与任务映射contentMap提示还有若干订单没有完成,这是说不通的(推测是不同步造成的),于是乎对for循环做了个修改,如下(加了synchronized关键字)。

 

for(String order : orderList){
synchronized(this){
  try{
    if(contentMap.get(order) != null){
      contentMap.get(order).remove(type);
      if(contentMap.get(order)!= null &&
	  contentMap.get(order).size() == 0){
	contentMap.remove(order);
      }
    }
  }catch(Exception e){
    e.printStackTrace();
  }
}
}

         再次执行,还是遇到了数据不一致ConcurrentHashMap的数据不一致问题
            
    
    博客分类: 我的Java多线程 多线程ConcurrentHashMapsynchronizedvolatile 

         于是乎上网查了下ConcurrentHashMap的一些实现原理,它利用了分段锁来提高并发性能,它支持完全并发的读以及一定程度并发的写,即写是分段锁,读操作不是加锁的。

ConcurrentHashMap包含若干段(Segment),每个段又有多个HashEntry,HashEntry存放我们put进去的数据,结构如下:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }

         啥?value是volatile的,在并发写的情况下就会出现不一致的情况啊,如果是synchronized的就没问题了。

可参考下面的连接,就能比较好的理解这两个关键字了:

volatile 与 synchronized 区别volatile与synchronized关键字

        于是乎修改对contentMap的初始化方法:

 

ConcurrentHashMap<String, List<String>> contentMap = new ConcurrentHashMap<String, List<String>>();
List<String> types = Collections.synchronizedList(new ArrayList<String>());
type.add("b01");
type.add("b02");
type.add("b03");
for(String zqgs : orderList){
   contentMap.put(zqgs, types);
}

 

         这样,不管运行多少次,当任务类型队列typeQueue为空,订单与任务映射contentMap也变为空了。

         注:至于为什么,for循环我加了synchronized关键字,依然不能保证数据的一致性,为啥呢?原因有点逗比,synchronized是加锁了,但锁住的是本线程的任务对象,和另一个线程没有关系啊!!!(如果锁对了,其实这个方法也是可行的)

还有改了之后,目前可能出现空指针异常。