欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java 高并发四:无锁详细介绍

程序员文章站 2024-03-13 17:00:39
在[高并发java 一] 前言中已经提到了无锁的概念,由于在jdk源码中有大量的无锁应用,所以在这里介绍下无锁。 1 无锁类的原理详解 1.1 cas cas算法的过...

在[高并发java 一] 前言中已经提到了无锁的概念,由于在jdk源码中有大量的无锁应用,所以在这里介绍下无锁。

1 无锁类的原理详解

1.1 cas

cas算法的过程是这样:它包含3个参数cas(v,e,n)。v表示要更新的变量,e表示预期值,n表示新值。仅当v
值等于e值时,才会将v的值设为n,如果v值和e值不同,则说明已经有其他线程做了更新,则当前线程什么
都不做。最后,cas返回当前v的真实值。cas操作是抱着乐观的态度进行的,它总是认为自己可以成功完成
操作。当多个线程同时使用cas操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程
不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,cas
操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

我们会发现,cas的步骤太多,有没有可能在判断v和e相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?

事实上,这个担心是多余的。cas整一个操作过程是一个原子操作,它是由一条cpu指令完成的。

1.2 cpu指令

cas的cpu指令是cmpxchg

指令代码如下:

 /*
 accumulator = al, ax, or eax, depending on whether
 a byte, word, or doubleword comparison is being performed
 */
 if(accumulator == destination) {
 zf = 1;
 destination = source;
 }
 else {
 zf = 0;
 accumulator = destination;
 }

目标值和寄存器里的值相等的话,就设置一个跳转标志,并且把原始数据设到目标里面去。如果不等的话,就不设置跳转标志了。

java当中提供了很多无锁类,下面来介绍下无锁类。

2 无所类的使用

我们已经知道,无锁比阻塞效率要高得多。我们来看看java是如何实现这些无锁类的。

2.1. atomicinteger

atomicinteger和integer一样,都继承与number类

public class atomicinteger extends number implements java.io.serializable

atomicinteger里面有很多cas操作,典型的有:

public final boolean compareandset(int expect, int update) {
        return unsafe.compareandswapint(this, valueoffset, expect, update);
    }

这里来解释一下unsafe.compareandswapint方法,他的意思是,对于this这个类上的偏移量为valueoffset的变量值如果与期望值expect相同,那么把这个变量的值设为update。
其实偏移量为valueoffset的变量就是value

static {
 try {
 valueoffset = unsafe.objectfieldoffset
  (atomicinteger.class.getdeclaredfield("value"));
 } catch (exception ex) { throw new error(ex); }
}

我们此前说过,cas是有可能会失败的,但是失败的代价是很小的,所以一般的实现都是在一个无限循环体内,直到成功为止。

public final int getandincrement() {
 for (;;) {
  int current = get();
  int next = current + 1;
  if (compareandset(current, next))
  return current;
 }
 }

2.2 unsafe

从类名就可知,unsafe操作是非安全的操作,比如:

根据偏移量设置值(在刚刚介绍的atomicinteger中已经看到了这个功能)
park()(把这个线程停下来,在以后的blog中会提到)
底层的cas操作
非公开api,在不同版本的jdk中,可能有较大差异

2.3. atomicreference

前面已经提到了atomicinteger,当然还有atomicboolean,atomiclong等等,都大同小异。

这里要介绍的是atomicreference。

atomicreference是一种模板类

public class atomicreference<v>  implements java.io.serializable

它可以用来封装任意类型的数据。

比如string

package test;

import java.util.concurrent.atomic.atomicreference;


public class test
{ 
 public final static atomicreference<string> atomicstring = new atomicreference<string>("hosee");
 public static void main(string[] args)
 {
 for (int i = 0; i < 10; i++)
 {
 final int num = i;
 new thread() {
 public void run() {
 try
 {
 thread.sleep(math.abs((int)math.random()*100));
 }
 catch (exception e)
 {
 e.printstacktrace();
 }
 if (atomicstring.compareandset("hosee", "ztk"))
 {
 system.out.println(thread.currentthread().getid() + "change value");
 }else {
 system.out.println(thread.currentthread().getid() + "failed");
 }
 };
 }.start();
 }
 }
}

结果:

10failed
13failed
9change value
11failed
12failed
15failed
17failed
14failed
16failed
18failed

可以看到只有一个线程能够修改值,并且后面的线程都不能再修改。

2.4.atomicstampedreference

我们会发现cas操作还是有一个问题的

比如之前的atomicinteger的incrementandget方法

public final int incrementandget() {
 for (;;) {
  int current = get();
  int next = current + 1;
  if (compareandset(current, next))
  return next;
 }
 }

假设当前value=1当某线程int current = get()执行后,切换到另一个线程,这个线程将1变成了2,然后又一个线程将2又变成了1。此时再切换到最开始的那个线程,由于value仍等于1,所以还是能执行cas操作,当然加法是没有问题的,如果有些情况,对数据的状态敏感时,这样的过程就不被允许了。
此时就需要atomicstampedreference类。

其内部实现一个pair类来封装值和时间戳。

private static class pair<t> {
 final t reference;
 final int stamp;
 private pair(t reference, int stamp) {
  this.reference = reference;
  this.stamp = stamp;
 }
 static <t> pair<t> of(t reference, int stamp) {
  return new pair<t>(reference, stamp);
 }
 }

这个类的主要思想是加入时间戳来标识每一次改变。

//比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳

public boolean compareandset(v expectedreference,
     v newreference,
     int expectedstamp,
     int newstamp) {
 pair<v> current = pair;
 return
  expectedreference == current.reference &&
  expectedstamp == current.stamp &&
  ((newreference == current.reference &&
  newstamp == current.stamp) ||
  caspair(current, pair.of(newreference, newstamp)));
 }

当期望值等于当前值,并且期望时间戳等于现在的时间戳时,才写入新值,并且更新新的时间戳。
这里举个用atomicstampedreference的场景,可能不太适合,但是想不到好的场景了。
场景背景是,某公司给余额少的用户免费充值,但是每个用户只能充值一次。

package test;

import java.util.concurrent.atomic.atomicstampedreference;

public class test
{
 static atomicstampedreference<integer> money = new atomicstampedreference<integer>(
 19, 0);

 public static void main(string[] args)
 {
 for (int i = 0; i < 3; i++)
 {
 final int timestamp = money.getstamp();
 new thread()
 {
 public void run()
 {
 while (true)
 {
 while (true)
 {
 integer m = money.getreference();
 if (m < 20)
 {
 if (money.compareandset(m, m + 20, timestamp,
  timestamp + 1))
 {
  system.out.println("充值成功,余额:"
  + money.getreference());
  break;
 }
 }
 else
 {
 break;
 }
 }
 }
 };
 }.start();
 }

 new thread()
 {
 public void run()
 {
 for (int i = 0; i < 100; i++)
 {
 while (true)
 {
 int timestamp = money.getstamp();
 integer m = money.getreference();
 if (m > 10)
 {
 if (money.compareandset(m, m - 10, timestamp,
  timestamp + 1))
 {
 system.out.println("消费10元,余额:"
  + money.getreference());
 break;
 }
 }else {
 break;
 }
 }
 try
 {
 thread.sleep(100);
 }
 catch (exception e)
 {
 // todo: handle exception
 }
 }
 };
 }.start();
 }

}

解释下代码,有3个线程在给用户充值,当用户余额少于20时,就给用户充值20元。有100个线程在消费,每次消费10元。用户初始有9元,当使用atomicstampedreference来实现时,只会给用户充值一次,因为每次操作使得时间戳+1。运行结果:

充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
消费10元,余额:9

如果使用atomicreference<integer>或者 atomic integer来实现就会造成多次充值。

充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
充值成功,余额:39
消费10元,余额:29

2.5. atomicintegerarray

与atomicinteger相比,数组的实现不过是多了一个下标。

public final boolean compareandset(int i, int expect, int update) {
        return compareandsetraw(checkedbyteoffset(i), expect, update);
    }

它的内部只是封装了一个普通的array

private final int[] array;

里面有意思的是运用了二进制数的前导零来算数组中的偏移量。

shift = 31 - integer.numberofleadingzeros(scale);

前导零的意思就是比如8位表示12,00001100,那么前导零就是1前面的0的个数,就是4。

具体偏移量如何计算,这里就不再做介绍了。

2.6. atomicintegerfieldupdater

atomicintegerfieldupdater类的主要作用是让普通变量也享受原子操作。

就比如原本有一个变量是int型,并且很多地方都应用了这个变量,但是在某个场景下,想让int型变成atomicinteger,但是如果直接改类型,就要改其他地方的应用。atomicintegerfieldupdater就是为了解决这样的问题产生的。

package test;

import java.util.concurrent.atomic.atomicinteger;
import java.util.concurrent.atomic.atomicintegerfieldupdater;


public class test
{
 public static class v{
 int id;
 volatile int score;
 public int getscore()
 {
 return score;
 }
 public void setscore(int score)
 {
 this.score = score;
 }
 
 }
 public final static atomicintegerfieldupdater<v> vv = atomicintegerfieldupdater.newupdater(v.class, "score");
 
 public static atomicinteger allscore = new atomicinteger(0);
 
 public static void main(string[] args) throws interruptedexception
 {
 final v stu = new v();
 thread[] t = new thread[10000];
 for (int i = 0; i < 10000; i++)
 {
 t[i] = new thread() {
 @override
 public void run()
 {
 if(math.random()>0.4)
 {
 vv.incrementandget(stu);
 allscore.incrementandget();
 }
 }
 };
 t[i].start();
 }
 for (int i = 0; i < 10000; i++)
 {
 t[i].join();
 }
 system.out.println("score="+stu.getscore());
 system.out.println("allscore="+allscore);
 }
}

上述代码将score使用 atomicintegerfieldupdater变成 atomicinteger。保证了线程安全。

这里使用allscore来验证,如果score和allscore数值相同,则说明是线程安全的。

小说明:

  1. updater只能修改它可见范围内的变量。因为updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果某变量申明为private,就是不可行的。
  2. 为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单得申明一下就行,这不会引起什么问题。
  3. 由于cas操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(unsafe.objectfieldoffset()不支持静态变量)。