无锁CAS整理

全部的锁都是悲观的,他们老是假设每一次的临界区操做会产生冲突,若是有多个线程同时须要访问临界区资源,就宁肯牺牲性能让线程进行等待,因此说锁会阻塞线程执行.而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的,全部的线程均可以在不停顿的状态下继续执行.无锁的策略是使用一种叫作比较交换的技术(CAS)来鉴别线程冲突,一旦检测到冲突产生,就重试当前操做,直到没有冲突为止.算法

CAS算法包含三个参数(V,E,N),V表示要更新的变量,E表示预期值,N表示新值.仅当V值等于E值时(意思是说其余线程没有更新V值),才会将V值设为N.若是V值和E值不一样,则说明已经有其余线程作了更新(V值),则当前线程什么都不作.最后,CAS返回当前V的真实值.数组

最简单的无锁安全整数:AtomicInteger安全

public class AtomicIntegerDemo {
    static AtomicInteger i = new AtomicInteger();
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                i.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:多线程

100000dom

由于jdk 8的incrementAndGet()已经牵涉到底层Unsafe类,它有大量的native标识,跟C语言挂钩的,这个咱们先不说.咱们本身来用无锁的对象引用AtomicReference来模拟实现一个这个过程.ide

public class AtomicReferenceInteger {
    //i即V值(V,E,N)
    static AtomicReference<Integer> i = new AtomicReference<>(0);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                while (true) {
                    //m即E值
                    Integer m = i.get();
                    //++m即N值,比较m跟i的值是否相等,若是相等,就把++m写入i,若是i值被其余线程修改,则继续循环
                    if (i.compareAndSet(m,++m)) {
                        break;
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:性能

100000this

以上能够看出spa

while (true) {
    //m即E值
    Integer m = i.get();
    //++m即N值,比较m跟i的值是否相等,若是相等,就把++m写入i,若是i值被其余线程修改,则继续循环
    if (i.compareAndSet(m,++m)) {
        break;
    }
}

即模拟实现了incrementAndGet();线程

带有时间戳的对象引用:AtomicStampedReference

使用带时间戳的对象引用时,对象值和时间戳都必须知足指望值,写入才会成功.所以,即便对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入.

public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,若是不等则从新来一遍,这里money的值可能会被其余线程修改
                            //当有其余线程改变了时间戳timestamp的时候,总体没法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

咱们先让时间戳永远不变,运行结果(部分选取)

余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
.

.
没有足够金额
没有足够金额
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够金额
没有足够金额
没有足够金额
没有足够金额
大于10元
成功消费10元,余额:19
余额小于20元,充值成功,余额:39元
余额小于20元,充值成功,余额:29元
大于10元
余额小于20元,充值成功,余额:29元
余额小于20元,充值成功,余额:39元
余额小于20元,充值成功,余额:29元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9

根据结果咱们会看到,两边的线程会不断的读取,写入.

如今咱们把时间戳改变+1

public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,若是不等则从新来一遍,这里money的值可能会被其余线程修改
                            //当有其余线程改变了时间戳timestamp的时候,总体没法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp + 1 )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp + 1)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

运行结果:

余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额
没有足够金额

.

.

时间戳+1后,不管运行多少次都不会出现重复充值的现象了。

数组的无锁:AtomicIntegerArray

public class AtomicIntegerArrayDemo {
    static AtomicIntegerArray arr = new AtomicIntegerArray(10);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            //数组内的全部元素各加1000次1
            for (int k = 0;k < 10000;k++) {
                arr.getAndIncrement(k % arr.length());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        //10个线程来执行
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(arr);
    }
}

运行结果:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

这里arr.getAndIncrement(int i)就是对第i个下标的元素加1

普通变量享受原子操做:AtomicIntegerFieldUpdater

public class AtomicIntegerFieldUpdateDemo {
    public static class Candidate {
        int id;
        volatile int score;
    }
    public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
    public static AtomicInteger allScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate stu = new Candidate();
        Thread[] t = new Thread[10000];
        for (int i = 0;i < 10000;i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        scoreUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
            t[i].join();
        }
        System.out.println("score=" + stu.score);
        System.out.println("allScore=" + allScore);
    }
}

运行结果:

score=5952
allScore=5952

不管运行多少次,咱们均可以看到score跟allScore相等,说明普通变量int score进行了原子操做(注意int score必须声明为volatile,多线程可见,且不能为私有类型)。

无锁Vector实现

模仿Vector机制来完成一个无锁线程安全的List集合(源码来自amino)

public class LockFreeVector<E> extends AbstractList<E> {
   private static final boolean debug = false;
   /**
    * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i])
    * 第一个数组大小
    */
   private static final int FIRST_BUCKET_SIZE = 8;

   /**
    * number of buckets. 30 will allow 8*(2^30-1) elements
    * 全部数组的个数
    */
   private static final int N_BUCKET = 30;

   /**
    * We will have at most N_BUCKET number of buckets. And we have
    * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1)
    * 存放数据的二维数组,方便动态扩展
    */
   private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

   /**
    * @author ganzhi
    * 写入类
    * @param <E>
    */
   static class WriteDescriptor<E> {
      //指望值,即E(V,E,N)
      public E oldV;
      //写入的新值,即N
      public E newV;
      //要修改的原子数组,即V
      public AtomicReferenceArray<E> addr;
      //要修改的数组的索引位置
      public int addr_ind;

      /**
       * Creating a new descriptor.
       * 
       * @param addr Operation address  要写入的数组
       * @param addr_ind Index of address  要写入的数组索引
       * @param oldV old operand  预期值
       * @param newV new operand  新值
       */
      public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind,
            E oldV, E newV) {
         this.addr = addr;
         this.addr_ind = addr_ind;
         this.oldV = oldV;
         this.newV = newV;
      }

      /**
       * set newV.
       * 给原子数组进行原子操做赋值
       */
      public void doIt() {
         addr.compareAndSet(addr_ind, oldV, newV);
      }
   }

   /**
    * @author ganzhi
    * 为了更有序的读写数组,使用CAS操做写入新数据
    * 写入器
    * @param <E>
    */
   static class Descriptor<E> {
      //整个Vector长度(非数组长度,是几个数组加起来的长度)
      public int size;
      //写入对象,对全部线程可见
      volatile WriteDescriptor<E> writeop;

      /**
       * Create a new descriptor.
       * 
       * @param size Size of the vector
       * @param writeop Executor write operation
       */
      public Descriptor(int size, WriteDescriptor<E> writeop) {
         this.size = size;
         this.writeop = writeop;
      }

      /**
       * 完成写入,doIt()方法才是真正对原子数组的写入
       */
      public void completeWrite() {
         WriteDescriptor<E> tmpOp = writeop;
         if (tmpOp != null) {
            tmpOp.doIt();
            writeop = null; // this is safe since all write to writeop use
            // null as r_value.
         }
      }
   }

   /**
    * 当前线程写入器的原子引用
    */
   private AtomicReference<Descriptor<E>> descriptor;
   private static final int zeroNumFirst = Integer
         .numberOfLeadingZeros(FIRST_BUCKET_SIZE);;

   /**
    * Constructor.
    */
   public LockFreeVector() {
      //初始化一个能够存放30个原子数组的数组,即二维数组
      buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
      //给第0位初始化一个8位长的原子数组
      buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
      //初始化一个原子引用的Descriptor对象,无长度,无内容
      descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,
            null));
   }

   /**
    * add e at the end of vector.
    * 最核心功能,将元素压入Vector最后一个位置
    * @param e
    *            element added
    */
   public void push_back(E e) {
      //预期写入器
      Descriptor<E> desc;
      //新值写入器
      Descriptor<E> newd;
      do {
         //获取当前线程写入器给预期写入器
         desc = descriptor.get();
         //若是有其余线程在循环跳出后修改了当前线程写入器,则完成一次写入,预防措施
         desc.completeWrite();
         //判断将数据插入到Vector的哪个数组中,Vector总共有30个数组
         //数组的长度,第一个是8,第二个是16,第三个是32。。。
         int pos = desc.size + FIRST_BUCKET_SIZE;
         int zeroNumPos = Integer.numberOfLeadingZeros(pos);
         //取得第几个数组
         int bucketInd = zeroNumFirst - zeroNumPos;
         //若是这个数组为空
         if (buckets.get(bucketInd) == null) {
            //取得上一个数组的长度*2
            int newLen = 2 * buckets.get(bucketInd - 1).length();
            if (debug)
               System.out.println("New Length is:" + newLen);
            //原子性增长新数组,若是这个数组为空,则建立一个新长度的数组,长度是上一个数组的2倍
            //若是不为空则等待
            buckets.compareAndSet(bucketInd, null,
                  new AtomicReferenceArray<E>(newLen));
         }
         //取得元素在目标数组中的索引位
         int idx = (0x80000000>>>zeroNumPos) ^ pos;
         //建立一个新的写入对象,包含目标数组buckets.get(bucketInd),待插入索引位idx,目标是否为空,插入对象e
         newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>(
               buckets.get(bucketInd), idx, null, e));
         //若是当前线程写入器与预期写入器不等,则从新循环,若是相等则将新值写入器赋给当前线程写入器
      } while (!descriptor.compareAndSet(desc, newd));
      //获取到新写入器的当前线程写入器完成写入
      descriptor.get().completeWrite();
   }

   /**
    * Remove the last element in the vector.
    *
    * @return element removed
    */
   public E pop_back() {
      Descriptor<E> desc;
      Descriptor<E> newd;
      E elem;
      do {
         desc = descriptor.get();
         desc.completeWrite();

         int pos = desc.size + FIRST_BUCKET_SIZE - 1;
         int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
               - Integer.numberOfLeadingZeros(pos);
         int idx = Integer.highestOneBit(pos) ^ pos;
         elem = buckets.get(bucketInd).get(idx);
         newd = new Descriptor<E>(desc.size - 1, null);
      } while (!descriptor.compareAndSet(desc, newd));

      return elem;
   }

   /**
    * Get element with the index.
    *
    * @param index
    *            index
    * @return element with the index
    */
   @Override
   public E get(int index) {
      int pos = index + FIRST_BUCKET_SIZE;
      int zeroNumPos = Integer.numberOfLeadingZeros(pos);
      //获取第几个数组
      int bucketInd = zeroNumFirst - zeroNumPos;
      //获取该数组的索引位
      int idx = (0x80000000>>>zeroNumPos) ^ pos;
      return buckets.get(bucketInd).get(idx);
   }

   /**
    * Set the element with index to e.
    *
    * @param index
    *            index of element to be reset
    * @param e
    *            element to set
    */
   /**
     * {@inheritDoc}
     */
   public E set(int index, E e) {
      int pos = index + FIRST_BUCKET_SIZE;
      int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      int idx = Integer.highestOneBit(pos) ^ pos;
      AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
      while (true) {
         E oldV = bucket.get(idx);
         if (bucket.compareAndSet(idx, oldV, e))
            return oldV;
      }
   }

   /**
    * reserve more space.
    *
    * @param newSize
    *            new size be reserved
    */
   public void reserve(int newSize) {
      int size = descriptor.get().size;
      int pos = size + FIRST_BUCKET_SIZE - 1;
      int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      if (i < 1)
         i = 1;

      int initialSize = buckets.get(i - 1).length();
      while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) {
         i++;
         initialSize *= FIRST_BUCKET_SIZE;
         buckets.compareAndSet(i, null, new AtomicReferenceArray<E>(
               initialSize));
      }
   }

   /**
    * size of vector.
    *
    * @return size of vector
    */
   public int size() {
      return descriptor.get().size;
   }

   /**
     * {@inheritDoc}
     */
   @Override
   public boolean add(E object) {
      push_back(object);
      return true;
   }
}

这里咱们重点对push_back(E e)方法(将对象添加到Vector的最末尾),get(int index)方法(取出第几个)进行了中文标注,其间Vector是一个二维数组,当第一个数组存满后,扩展到第二个数组,每一个数组的长度都是乘2扩展的。

相关文章
相关标签/搜索