在Java多线程并发的状况下同时对一个变量进行操做会出现线程安全的问题,假如咱们如今使用20个线程对一个变量不停累加1,代码以下:java
1 public class ThreadDemo implements Runnable { 2 private int num = 0; 3 @Override 4 public void run() { 5 try { 6 Thread.sleep(1000); 7 } catch (InterruptedException e) { 8 e.printStackTrace(); 9 } 10 num++; 11 System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num); 12 } 13 14 public static void main(String[] args) { 15 ThreadDemo demo = new ThreadDemo(); 16 for (int i = 0; i < 20; i++) { 17 new Thread(demo,"线程" + i).start(); 18 } 19 } 20 }
理想状况是累加到20,但实际运行的结果以下:算法
1 线程2处理成功,num = 3 2 线程3处理成功,num = 3 3 线程1处理成功,num = 3 4 线程0处理成功,num = 3 5 线程5处理成功,num = 4 6 线程7处理成功,num = 6 7 线程9处理成功,num = 8 8 线程4处理成功,num = 5 9 线程6处理成功,num = 4 10 线程8处理成功,num = 7 11 线程10处理成功,num = 9 12 线程11处理成功,num = 11 13 线程13处理成功,num = 12 14 线程12处理成功,num = 11 15 线程14处理成功,num = 13 16 线程15处理成功,num = 14 17 线程19处理成功,num = 18 18 线程18处理成功,num = 18 19 线程16处理成功,num = 16 20 线程17处理成功,num = 16
实际运行的结果可能有多种状况,由于在Java多线程并发的状况下会有这种安全问题,致使结果不许确,针对这种问题,有如下几种解决方案数据库
一、方案一:synchronized编程
1 public class ThreadDemo implements Runnable { 2 3 private int num = 0; 4 5 private synchronized void increase(){ 6 num++; 7 } 8 9 @Override 10 public void run() { 11 try { 12 Thread.sleep(1000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 increase(); 17 System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num); 18 } 19 20 public static void main(String[] args) { 21 ThreadDemo demo = new ThreadDemo(); 22 for (int i = 0; i < 20; i++) { 23 new Thread(demo,"线程" + i).start(); 24 } 25 } 26 }
运行结果以下:数组
1 线程0处理成功,num = 1 2 线程1处理成功,num = 2 3 线程2处理成功,num = 3 4 线程3处理成功,num = 4 5 线程4处理成功,num = 5 6 线程5处理成功,num = 6 7 线程6处理成功,num = 7 8 线程7处理成功,num = 8 9 线程12处理成功,num = 13 10 线程14处理成功,num = 15 11 线程15处理成功,num = 16 12 线程16处理成功,num = 17 13 线程10处理成功,num = 11 14 线程9处理成功,num = 10 15 线程8处理成功,num = 10 16 线程13处理成功,num = 14 17 线程11处理成功,num = 13 18 线程17处理成功,num = 18 19 线程18处理成功,num = 19 20 线程19处理成功,num = 20
这个时候,代码就是线程安全的了,由于咱们加了synchronized,也就是让每一个线程要进入increase()方法以前先得尝试加锁,同一时间只有一个线程能加锁,其余线程须要等待锁。经过这样处理,就能够保证换个data每次都会累加1,不会出现数据错乱的问题。可是,如此简单的data++操做,都要加一个重磅的synchronized锁来解决多线程并发问题,一个接一个的排队,加锁,处理数据,释放锁,下一个再进来,有点大材小用,synchronized是能够解决更加复杂的并发编程场景和问题的。缓存
二、更高效方案:Atomic类安全
对于这种简单的data++类的操做,java并发包下面提供了一系列的Atomic原子类,好比说AtomicInteger,能够保证多线程并发安全的状况下,高性能的并发更新一个数值,代码以下:多线程
1 public class ThreadDemo implements Runnable { 2 3 private AtomicInteger num = new AtomicInteger(0); 4 5 @Override 6 public void run() { 7 try { 8 Thread.sleep(1000); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } 12 num.incrementAndGet(); 13 System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num); 14 } 15 16 public static void main(String[] args) { 17 ThreadDemo demo = new ThreadDemo(); 18 for (int i = 0; i < 20; i++) { 19 new Thread(demo,"线程" + i).start(); 20 } 21 } 22 }
运行结果以下:并发
线程1处理成功,num = 2 线程0处理成功,num = 2 线程2处理成功,num = 3 线程4处理成功,num = 5 线程3处理成功,num = 4 线程5处理成功,num = 6 线程6处理成功,num = 7 线程7处理成功,num = 8 线程9处理成功,num = 11 线程11处理成功,num = 12 线程13处理成功,num = 14 线程15处理成功,num = 16 线程16处理成功,num = 17 线程17处理成功,num = 18 线程10处理成功,num = 11 线程8处理成功,num = 11 线程14处理成功,num = 15 线程12处理成功,num = 13 线程18处理成功,num = 19 线程19处理成功,num = 20
多个线程能够并发的执行AtomicInteger的incrementAndGet()方法,意思就是data的值累加1,接着返回累加后最新的值,这个代码里就没有看到加锁和释放锁app
1 public final int getAndAddInt(Object var1, long var2, int var4) { 2 int var5; 3 do { 4 var5 = this.getIntVolatile(var1, var2); 5 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); 6 7 return var5; 8 }
实际上,Atomic原子类底层用的不是传统意义的锁机制,而是无锁化的CAS机制,经过CAS机制保证多线程修改一个数值的安全性,有点乐观锁的意思,先取值,累加的时候会比较原来的值是否改变,若是改变就不会执行,会再次获取最新值(Compare And Swap)
三、Java8对CAS的优化:LongAdder
CAS机制能够轻量级的实现线程安全,但CAS也有一个问题就是在多线程同时更改一个变量的值的时候可能会循环屡次才会变动成功,在高并发的状况下这种状况会更常见,耗费大量系统资源在死循环上面,所以Java8推出了LongAdder
1 /** 2 * Adds the given value. 3 * 4 * @param x the value to add 5 */ 6 public void add(long x) { 7 Cell[] as; long b, v; int m; Cell a; 8 if ((as = cells) != null || !casBase(b = base, b + x)) { 9 boolean uncontended = true; 10 if (as == null || (m = as.length - 1) < 0 || 11 (a = as[getProbe() & m]) == null || 12 !(uncontended = a.cas(v = a.value, v + x))) 13 longAccumulate(x, null, uncontended); 14 } 15 } 16 17 /** 18 * Handles cases of updates involving initialization, resizing, 19 * creating new Cells, and/or contention. See above for 20 * explanation. This method suffers the usual non-modularity 21 * problems of optimistic retry code, relying on rechecked sets of 22 * reads. 23 * 24 * @param x the value 25 * @param fn the update function, or null for add (this convention 26 * avoids the need for an extra field or function in LongAdder). 27 * @param wasUncontended false if CAS failed before call 28 */ 29 final void longAccumulate(long x, LongBinaryOperator fn, 30 boolean wasUncontended) { 31 int h; 32 if ((h = getProbe()) == 0) { 33 ThreadLocalRandom.current(); // force initialization 34 h = getProbe(); 35 wasUncontended = true; 36 } 37 boolean collide = false; // True if last slot nonempty 38 for (;;) { 39 Cell[] as; Cell a; int n; long v; 40 if ((as = cells) != null && (n = as.length) > 0) { 41 if ((a = as[(n - 1) & h]) == null) { 42 if (cellsBusy == 0) { // Try to attach new Cell 43 Cell r = new Cell(x); // Optimistically create 44 if (cellsBusy == 0 && casCellsBusy()) { 45 boolean created = false; 46 try { // Recheck under lock 47 Cell[] rs; int m, j; 48 if ((rs = cells) != null && 49 (m = rs.length) > 0 && 50 rs[j = (m - 1) & h] == null) { 51 rs[j] = r; 52 created = true; 53 } 54 } finally { 55 cellsBusy = 0; 56 } 57 if (created) 58 break; 59 continue; // Slot is now non-empty 60 } 61 } 62 collide = false; 63 } 64 else if (!wasUncontended) // CAS already known to fail 65 wasUncontended = true; // Continue after rehash 66 else if (a.cas(v = a.value, ((fn == null) ? v + x : 67 fn.applyAsLong(v, x)))) 68 break; 69 else if (n >= NCPU || cells != as) 70 collide = false; // At max size or stale 71 else if (!collide) 72 collide = true; 73 else if (cellsBusy == 0 && casCellsBusy()) { 74 try { 75 if (cells == as) { // Expand table unless stale 76 Cell[] rs = new Cell[n << 1]; 77 for (int i = 0; i < n; ++i) 78 rs[i] = as[i]; 79 cells = rs; 80 } 81 } finally { 82 cellsBusy = 0; 83 } 84 collide = false; 85 continue; // Retry with expanded table 86 } 87 h = advanceProbe(h); 88 } 89 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 90 boolean init = false; 91 try { // Initialize table 92 if (cells == as) { 93 Cell[] rs = new Cell[2]; 94 rs[h & 1] = new Cell(x); 95 cells = rs; 96 init = true; 97 } 98 } finally { 99 cellsBusy = 0; 100 } 101 if (init) 102 break; 103 } 104 else if (casBase(v = base, ((fn == null) ? v + x : 105 fn.applyAsLong(v, x)))) 106 break; // Fall back on using base 107 } 108 }
LongAdder就是尝试使用分段CAS的方式来提高高并发执行CAS操做的性能,当并发更新的线程数量过多,其内部会搞一个Cell数组,每一个数组是一个数值分段,这时,让大量的线程分别去对不一样Cell内部的value值进行CAS累加操做,把CAS计算压力分散到了不一样的Cell分段数值中,这样就能够大幅度的下降多线程并发更新同一个数值时出现的无限循环的问题,并且他内部实现了自动分段迁移的机制,也就是若是某个Cell的value执行CAS失败了,那么就会自动去找另一个Cell分段内的value值进行CAS操做。
1 /** 2 * Returns the current sum. The returned value is <em>NOT</em> an 3 * atomic snapshot; invocation in the absence of concurrent 4 * updates returns an accurate result, but concurrent updates that 5 * occur while the sum is being calculated might not be 6 * incorporated. 7 * 8 * @return the sum 9 */ 10 public long sum() { 11 Cell[] as = cells; Cell a; 12 long sum = base; 13 if (as != null) { 14 for (int i = 0; i < as.length; ++i) { 15 if ((a = as[i]) != null) 16 sum += a.value; 17 } 18 } 19 return sum; 20 }
最后,从LongAdder中获取当前累加的总值,就会把base值和全部Cell分段数值加起来返回
从LongAdder分段加锁的实现逻辑中,咱们也能够对于一些并发量较大,持续时间较长的不适用缓存模式的抢购类项目的乐观锁进行改造,假如商品有1000个库存,那么彻底能够给拆成20个库存段,能够在数据库的表里建20个库存字段,好比stock_01,stock_02,以此类推,总之,就是把你的1000件库存给他拆开,每一个库存段是50件库存,好比stock_01对应50件库存,stock_02对应50件库存。接着,每秒1000个请求过来了,经过简单的随机算法,每一个请求都是随机在20个分段库存里,选择一个进行加锁。这样有最多20个下单请求一块儿执行,每一个下单请求锁了一个库存分段,而后在业务逻辑里面,就对数据库的那个分段库存进行操做便可,包括查库存 -> 判断库存是否充足 -> 扣减库存,经过这种方式提高并发量,提升用户体验。