[高并发Java 四] 无锁

[高并发Java 一] 前言中已经提到了无锁的概念,因为在jdk源码中有大量的无锁应用,因此在这里介绍下无锁。 java

1 无锁类的原理详解

1.1 CAS

CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V
值等于E值时,才会将V的值设为N,若是V值和E值不一样,则说明已经有其余线程作了更新,则当前线程什么
都不作。最后,CAS返回当前V的真实值。CAS操做是抱着乐观的态度进行的,它老是认为本身能够成功完成
操做。当多个线程同时使用CAS操做一个变量时,只有一个会胜出,并成功更新,其他均会失败。失败的线程
不会被挂起,仅是被告知失败,而且容许再次尝试,固然也容许失败的线程放弃操做。基于这样的原理,CAS
操做即时没有锁,也能够发现其余线程对当前线程的干扰,并进行恰当的处理。 算法

咱们会发现,CAS的步骤太多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。形成了数据不一致呢? 设计模式

事实上,这个担忧是多余的。CAS整一个操做过程是一个原子操做,它是由一条CPU指令完成的。 数组

1.2 CPU指令

CAS的CPU指令是cmpxchg 安全

指令代码以下: 多线程

        /*
	accumulator = AL, AX, or EAX, depending on whether
	a byte, word, or doubleword comparison is being performed
	*/
	if(accumulator == Destination) {
	ZF = 1;
	Destination = Source;
	}
	else {
	ZF = 0;
	accumulator = Destination;
	}
目标值和寄存器里的值相等的话,就设置一个跳转标志,而且把原始数据设到目标里面去。若是不等的话,就不设置跳转标志了。

Java当中提供了不少无锁类,下面来介绍下无锁类。 并发

2 无所类的使用

咱们已经知道,无锁比阻塞效率要高得多。咱们来看看Java是如何实现这些无锁类的。 dom

2.1. AtomicInteger

AtomicInteger和Integer同样,都继承与Number类 ide

public class AtomicInteger extends Number implements java.io.Serializable

AtomicInteger里面有不少CAS操做,典型的有: 高并发

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
这里来解释一下unsafe.compareAndSwapInt方法,他的意思是,对于this这个类上的偏移量为valueOffset的变量值若是与指望值expect相同,那么把这个变量的值设为update。

其实偏移量为valueOffset的变量就是value

static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
}
咱们此前说过,CAS是有可能会失败的,可是失败的代价是很小的,因此通常的实现都是在一个无限循环体内,直到成功为止。

public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

2.2 Unsafe

从类名就可知,Unsafe操做是非安全的操做,好比:

  • 根据偏移量设置值(在刚刚介绍的AtomicInteger中已经看到了这个功能
  • park()(把这个线程停下来,在之后的Blog中会提到)
  • 底层的CAS操做
非公开API,在不一样版本的JDK中,可能有较大差别

2.3. AtomicReference

前面已经提到了AtomicInteger,固然还有AtomicBoolean,AtomicLong等等,都大同小异。

这里要介绍的是AtomicReference

AtomicReference是一种模板类

public class AtomicReference<V>  implements java.io.Serializable
它能够用来封装任意类型的数据。

好比String

package test;

import java.util.concurrent.atomic.AtomicReference;


public class Test
{ 
	public final static AtomicReference<String> atomicString = new AtomicReference<String>("hosee");
	public static void main(String[] args)
	{
		for (int i = 0; i < 10; i++)
		{
			final int num = i;
			new Thread() {
				public void run() {
					try
					{
						Thread.sleep(Math.abs((int)Math.random()*100));
					}
					catch (Exception e)
					{
						e.printStackTrace();
					}
					if (atomicString.compareAndSet("hosee", "ztk"))
					{
						System.out.println(Thread.currentThread().getId() + "Change value");
					}else {
						System.out.println(Thread.currentThread().getId() + "Failed");
					}
				};
			}.start();
		}
	}
}
结果:

10Failed
13Failed
9Change value
11Failed
12Failed
15Failed
17Failed
14Failed
16Failed
18Failed
能够看到只有一个线程可以修改值,而且后面的线程都不能再修改。

2.4.AtomicStampedReference

咱们会发现CAS操做仍是有一个问题的

好比以前的AtomicInteger的incrementAndGet方法

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
假设当前value=1当某线程int current = get()执行后,切换到另外一个线程,这个线程将1变成了2,而后又一个线程将2又变成了1。此时再切换到最开始的那个线程,因为value仍等于1,因此仍是能执行CAS操做,固然加法是没有问题的,若是有些状况,对数据的状态敏感时,这样的过程就不被容许了。

此时就须要AtomicStampedReference类。

其内部实现一个Pair类来封装值和时间戳。

private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

这个类的主要思想是加入时间戳来标识每一次改变。

//比较设置 参数依次为:指望值 写入新值 指望时间戳 新时间戳
public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
当指望值等于当前值,而且指望时间戳等于如今的时间戳时,才写入新值,而且更新新的时间戳。
这里举个用AtomicStampedReference的场景,可能不太适合,可是想不到好的场景了。

场景背景是,某公司给余额少的用户免费充值,可是每一个用户只能充值一次。

package test;

import java.util.concurrent.atomic.AtomicStampedReference;

public class Test
{
	static AtomicStampedReference<Integer> money = new AtomicStampedReference<Integer>(
			19, 0);

	public static void main(String[] args)
	{
		for (int i = 0; i < 3; i++)
		{
			final int timestamp = money.getStamp();
			new Thread()
			{
				public void run()
				{
					while (true)
					{
						while (true)
						{
							Integer m = money.getReference();
							if (m < 20)
							{
								if (money.compareAndSet(m, m + 20, timestamp,
										timestamp + 1))
								{
									System.out.println("充值成功,余额:"
											+ money.getReference());
									break;
								}
							}
							else
							{
								break;
							}
						}
					}
				};
			}.start();
		}

		new Thread()
		{
			public void run()
			{
				for (int i = 0; i < 100; i++)
				{
					while (true)
					{
						int timestamp = money.getStamp();
						Integer m = money.getReference();
						if (m > 10)
						{
							if (money.compareAndSet(m, m - 10, timestamp,
									timestamp + 1))
							{
								System.out.println("消费10元,余额:"
											+ money.getReference());
								break;
							}
						}else {
							break;
						}
					}
					try
					{
						Thread.sleep(100);
					}
					catch (Exception e)
					{
						// TODO: handle exception
					}
				}
			};
		}.start();
	}

}
解释下代码,有3个线程在给用户充值,当用户余额少于20时,就给用户充值20元。有100个线程在消费,每次消费10元。用户初始有9元,当使用AtomicStampedReference来实现时,只会给用户充值一次,由于每次操做使得时间戳+1。运行结果:

充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
消费10元,余额:9
若是使用AtomicReference<Integer>或者 Atomic Integer来实现就会形成屡次充值。

充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
充值成功,余额:39
消费10元,余额:29
消费10元,余额:19
充值成功,余额:39
消费10元,余额:29

2.5. AtomicIntegerArray

与AtomicInteger相比,数组的实现不过是多了一个下标。
public final boolean compareAndSet(int i, int expect, int update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

它的内部只是封装了一个普通的array

private final int[] array;
里面有意思的是运用了二进制数的前导零来算数组中的偏移量。
shift = 31 - Integer.numberOfLeadingZeros(scale);
前导零的意思就是好比8位表示12,00001100,那么前导零就是1前面的0的个数,就是4。

具体偏移量如何计算,这里就再也不作介绍了。

2.6. AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater类的主要做用是让普通变量也享受原子操做。

就好比本来有一个变量是int型,而且不少地方都应用了这个变量,可是在某个场景下,想让int型变成AtomicInteger,可是若是直接改类型,就要改其余地方的应用。AtomicIntegerFieldUpdater就是为了解决这样的问题产生的。

package test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


public class Test
{
	public static class V{
		int id;
		volatile int score;
		public int getScore()
		{
			return score;
		}
		public void setScore(int score)
		{
			this.score = score;
		}
		
	}
	public final static AtomicIntegerFieldUpdater<V> vv = AtomicIntegerFieldUpdater.newUpdater(V.class, "score");
	
	public static AtomicInteger allscore = new AtomicInteger(0);
	
	public static void main(String[] args) throws InterruptedException
	{
		final V stu = new V();
		Thread[] t = new Thread[10000];
		for (int i = 0; i < 10000; i++)
		{
			t[i] = new Thread() {
				@Override
				public void run()
				{
					if(Math.random()>0.4)
					{
						vv.incrementAndGet(stu);
						allscore.incrementAndGet();
					}
				}
			};
			t[i].start();
		}
		for (int i = 0; i < 10000; i++)
		{
			t[i].join();
		}
		System.out.println("score="+stu.getScore());
		System.out.println("allscore="+allscore);
	}
}
上述代码将score使用 AtomicIntegerFieldUpdater变成 AtomicInteger。保证了线程安全。

这里使用allscore来验证,若是score和allscore数值相同,则说明是线程安全的。

小说明:

  • Updater只能修改它可见范围内的变量。由于Updater使用反射获得这个变量。若是变量不可见,就会出错。好比若是某变量申明为private,就是不可行的。
  • 为了确保变量被正确的读取,它必须是volatile类型的。若是咱们原有代码中未申明这个类型,那么简单得申明一下就行,这不会引发什么问题。
  • 因为CAS操做会经过对象实例中的偏移量直接进行赋值,所以,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。






系列:

[高并发Java 一] 前言

[高并发Java 二] 多线程基础

[高并发Java 三] Java内存模型和线程安全

[高并发Java 四] 无锁

[高并发Java 五] JDK并发包1

[高并发Java 六] JDK并发包2

[高并发Java 七] 并发设计模式

[高并发Java 八] NIO和AIO

[高并发Java 九] 锁的优化和注意事项

[高并发Java 十] JDK8对并发的新支持

相关文章
相关标签/搜索