1、概念html
1.定义:当多个线程访问某个类时,无论运行时环境采用何种调度方式或者这些进程将如何交替执行,而且在主调代码中不须要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。java
2.线程安全性:算法
原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操做。 安全
可见性:一个线程对主内存的修改能够及时的被其余线程观察到。 数据结构
有序性:一个线程观察其余线程中的指令执行顺序,因为指令重排排序的存在,该观察结果通常杂乱无序。多线程
2、原子性-Atomic并发
1.原子性--Atomic包ide
原子英文单词为:atomic,刚恰好Java下定义了这样的类,好比:AtomicXXX:CAS;AtomicLong、LongAdder。高并发
2.为何要使用这个呢?或者说在什么场景下使用呢?工具
在并发场景中,当多线程须要对同一份资源作操做时,就会产生线程安全问题。以最简单的int i++为例,i++并非原子操做,编译出来后分为三步:1,获取值;2,修改值;3,设置值。若是有多线程执行i++,则一般不会获得正确的结果。举例以下:
/*** @author 繁荣Aaron*/public class ActiomTest {static Logger logger = LoggerFactory.getLogger(ActiomTest.class);private static int n = 0;public static void main(String[] args) throws Exception {Thread t1 = new Thread() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) {n++;try { Thread.currentThread().sleep(10); } catch (InterruptedException e) { }}}};Thread t2 = new Thread() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) {n++;try { Thread.currentThread().sleep(10); } catch (InterruptedException e) { }}}};t1.start();t2.start();t1.join();t2.join();logger.info("n = {}", n);}}
结果以下:
并非咱们所须要的结果:2000。因此必须用方法进行解决。
解决方式,以下:
1.使用synchronized关键字,具体使用参考先前的文章
(https://my.oschina.net/u/2380961/blog/1594040)。
2.JDK并发包里提供了不少线程安全的类。如:int对应线程安全的AtomicInteger。相似的还有:AtomicBoolean,AtomicLong,AtomicReference。
3.如何使用?
举例,以下:
public class AtomicExample1 {// 请求总数 public static int clientTotal = 5000;// 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {//semaphore.acquire();//add();count.incrementAndGet();//semaphore.release();} catch (Exception e) {e.printStackTrace();}//countDownLatch.countDown();});}//countDownLatch.await();executorService.shutdown();System.out.println(count.get()); }private static void add() {int i = count.incrementAndGet();// count.getAndIncrement();//System.out.println(i);}}
一开始,去掉Semaphore 和CountDownLatch 两个工具类。总共执行5000次,那么输出的结果也应该是5000。可是真实结果却不是,若是多执行几回机会出现以下的错误,结果倒是4997:
因此上面若是缺乏CountDownLatch 这个工具类,是没法达到线程安全的,就算是AtomicInteger类。具体缘由,我没有弄清楚,就算是加上volatile关键字也不行的:
只要打开了CountDownLatch 关键字才能够,下面的程序是线程安全的:
@ThreadSafepublic class AtomicExample1 {// 请求总数public static int clientTotal = 5000;// 同时并发执行的线程数public static int threadTotal = 200;public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {//semaphore.acquire();//add();count.incrementAndGet();//semaphore.release();} catch (Exception e) {e.printStackTrace();}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();System.out.println(count.get());}private static void add() {int i = count.incrementAndGet();// count.getAndIncrement();//System.out.println(i);}}
#API
public final int get() //获取当前的值public final int getAndSet(int newValue)//获取当前的值,并设置新的值public final int getAndIncrement()//获取当前的值,并自增public final int getAndDecrement() //获取当前的值,并自减public final int getAndAdd(int delta) //获取当前的值,并加上预期的值integer.incrementAndGet(); //先+1,而后在返回值,至关于++iinteger.decrementAndGet();//先-1,而后在返回值,至关于--iinteger.addAndGet(1);//先+n,而后在返回值,
总结:
1.使用的是线程池技术,特别须要注意的是就算是AtomicInteger类若是是单独的使用,也是线程不安全的。关于上面的缘由 为何会致使线程不安全后面讲了线程池在叙说。
2.CountDownLatch 关键字只是保证了线程的执行,并不线程的原子性,那么究竟是什么缘由使AtomicInteger保持原子性呢?
4.atomic原理之CAS
CAS,Compare and Swap即比较并交换,设计并发算法时经常使用到的一种技术,java.util.concurrent包彻底创建在CAS之上,没有CAS也就没有此包,可见CAS的重要性。
当前的处理器基本都支持CAS,只不过不一样的厂家的实现不同罢了。CAS有三个操做数:内存值V、旧的预期值A、要修改的值B,当且仅当预期值A和内存值V相同时,将内存值修改成B并返回true,不然什么都不作并返回false。固然更加底层的,就是Unsafe实现的,看下Unsafe下的三个方法:
public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);#该方法为本地方法,有四个参数,分别表明:对象、对象的地址、预期值、修改值public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);
Java内部原理代码,以下:
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;
这里, unsafe是java提供的得到对对象内存地址访问的类,注释已经清楚的写出了,它的做用就是在更新操做时提供“比较并替换”的做用。实际上就是AtomicInteger中的一个工具。
valueOffset是用来记录value自己在内存的便宜地址的,这个记录,也主要是为了在更新操做在内存中找到value的位置,方便比较。
注意:value是用来存储整数的时间变量,这里被声明为volatile,就是为了保证在更新操做时,当前线程能够拿到value最新的值(并发环境下,value可能已经被其余线程更新了)。
下面找一个方法getAndIncrement来研究一下AtomicInteger是如何实现的,好比咱们经常使用的addAndGet方法:
public final int addAndGet(int delta) {for (;;) {int current = get();int next = current + delta;if (compareAndSet(current, next))return next;}}
这段代码如何在不加锁的状况下经过CAS实现线程安全,咱们不妨考虑一下方法的执行:
一、AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据Java内存模型,线程1和线程2各自持有一份value的副本,值为3
二、线程1运行到第三行获取到当前的value为3,线程切换
三、线程2开始运行,获取到value为3,利用CAS对比内存中的值也为3,比较成功,修改内存,此时内存中的value改变比方说是4,线程切换
四、线程1恢复运行,利用CAS比较发现本身的value为3,内存中的value为4,获得一个重要的结论-->此时value正在被另一个线程修改,因此我不能去修改它
五、线程1的compareAndSet失败,循环判断,由于value是volatile修饰的,因此它具有可见性的特性,线程2对于value的改变能被线程1看到,只要线程1发现当前获取的value是4,内存中的value也是4,说明线程2对于value的修改已经完毕而且线程1能够尝试去修改它
六、最后说一点,好比说此时线程3也准备修改value了,不要紧,由于比较-交换是一个原子操做不可被打断,线程3修改了value,线程1进行compareAndSet的时候必然返回的false,这样线程1会继续循环去获取最新的value并进行compareAndSet,直至获取的value和内存中的value一致为止
整个过程当中,利用CAS机制保证了对于value的修改的线程安全性。
CAS的缺陷
CAS虽然高效地解决了原子操做,可是仍是存在一些缺陷的,主要表如今三个方法:循环时间太长、只能保证一个共享变量原子操做、ABA问题。
循环时间太长
若是CAS一直不成功呢?这种状况绝对有可能发生,若是自旋CAS长时间地不成功,则会给CPU带来很是大的开销。在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。
只能保证一个共享变量原子操做
看了CAS的实现就知道这只能针对一个共享变量,若是是多个共享变量就只能使用锁了,固然若是你有办法把多个变量整成一个变量,利用CAS也不错。例如读写锁中state的高地位
ABA问题
CAS须要检查操做值有没有发生改变,若是没有发生改变则更新。可是存在这样一种状况:若是一个值原来是A,变成了B,而后又变成了A,那么在CAS检查的时候会发现没有改变,可是实质上它已经发生了改变,这就是所谓的ABA问题。对于ABA问题其解决方案是加上版本号,即在每一个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。
缺陷的解方式:CAS的ABA隐患问题,解决方案则是版本号,Java提供了AtomicStampedReference来解决。AtomicStampedReference经过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题。对于上面的案例应该线程1会失败。
#四个参数,分别表示:预期引用、更新后的引用、预期标志、更新后的标志public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedStamp == current.stamp &&((newReference == current.reference &&newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));}
代码案例:
Thread tsf1 = new Thread(new Runnable() {@Overridepublic void run() {try {//让 tsf2先获取stamp,致使预期时间戳不一致TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}// 预期引用:100,更新后的引用:110,预期标识getStamp() 更新后的标识getStamp() + 1atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);}});Thread tsf2 = new Thread(new Runnable() {@Overridepublic void run() {int stamp = atomicStampedReference.getStamp();try {TimeUnit.SECONDS.sleep(2); //线程tsf1执行完} catch (InterruptedException e) {e.printStackTrace();}System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));}});tsf1.start();tsf2.start();
3、原子性-锁
锁,主要讲两个关键字:synchronized(依赖JVM);Lock(依赖特殊的CPU指令,代码实现 ,ReentrantLock)。
1.synchronized
synchronized能够保证方法或者代码块在运行时,同一时刻只有一个方法能够进入到临界区,同时它还能够保证共享变量的内存可见性。具体的使用参考博客地址(https://my.oschina.net/u/2380961/blog/1594040)。
使用须要主要的地方:
修饰代码块:大括号括起来的代码,做用于调用的对象。
修饰方法:整个方法,做用于调用的对象。
修饰静态方法:整个静态方法,做用于全部对象。
修饰类:括号括起来的部分,做用于全部对象。
当一个线程访问同步代码块时,它首先是须要获得锁才能执行同步代码,当退出或者抛出异常时必需要释放锁,那么它是如何来实现这个机制的呢?咱们先看一段简单的代码:
public class SynchronizedTest {public synchronized void test1(){}public void test2(){synchronized (this){}}}
利用javap工具查看生成的class文件信息来分析Synchronize的实现:
从上面能够看出,同步代码块是使用monitorenter和monitorexit指令实现的,同步方法(在这看不出来须要看JVM底层实现)依靠的是方法修饰符上的ACC_SYNCHRONIZED实现。具体体现,以下:
进入,获取锁:
每一个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的全部权,过程以下:
一、若是monitor的进入数为0,则该线程进入monitor,而后将进入数设置为1,该线程即为monitor的全部者。
二、若是线程已经占有该monitor,只是从新进入,则进入monitor的进入数加1.
3.若是其余线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再从新尝试获取monitor的全部权。
释放锁:
执行monitorexit的线程必须是objectref所对应的monitor的全部者。
指令执行时,monitor的进入数减1,若是减1后进入数为0,那线程退出monitor,再也不是这个monitor的全部者。其余被这个monitor阻塞的线程能够尝试去获取这个 monitor 的全部权。
经过这两段描述,咱们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是经过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为何只有在同步的块或者方法中才能调用wait/notify等方法,不然会抛出java.lang.IllegalMonitorStateException的异常的缘由。
同步代码块:monitorenter指令插入到同步代码块的开始位置,monitorexit指令插入到同步代码块的结束位置,JVM须要保证每个monitorenter都有一个monitorexit与之相对应。任何对象都有一个monitor与之相关联,当且一个monitor被持有以后,他将处于锁定状态。线程执行到monitorenter指令时,将会尝试获取对象所对应的monitor全部权,即尝试获取对象的锁;
同步方法:synchronized方法则会被翻译成普通的方法调用和返回指令如:invokevirtual、areturn指令,在VM字节码层面并无任何特别的指令来实现被synchronized修饰的方法,而是在Class文件的方法表中将该方法的access_flags字段中的synchronized标志位置1,表示该方法是同步方法并使用调用该方法的对象或该方法所属的Class在JVM的内部对象表示Klass作为锁对象。
(摘自:http://www.cnblogs.com/javaminer/p/3889023.html)
2.Lock
首先,这篇将不介绍Lock的使用,具体的APi使用参考这边博客地址:Java多线程知识点整理(Lock锁):
https://my.oschina.net/u/2380961/blog/1595357
@Slf4j@ThreadSafepublic class LockExample2 {// 请求总数 public static int clientTotal = 5000;// 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock();public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {semaphore.acquire();add();semaphore.release();} catch (Exception e) {//log.error("exception", e);}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();//log.info("count:{}", count);}private static void add() {lock.lock();try {count++;} finally {lock.unlock();}}}
2.1 为何要使用Lock锁?
Java的内置锁一直都是备受争议的,在JDK 1.6以前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略,可是与Lock相比synchronized仍是存在一些缺陷的:虽然synchronized提供了便捷性的隐式获取锁释放锁机制(基于JVM机制),可是它却缺乏了获取锁与释放锁的可操做性,可中断、超时获取锁,且它为独占式在高并发场景下性能大打折扣。
AbstractQueuedSynchronizer,简称AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一个内部类是这个抽象类的子类。因为AQS是基于FIFO队列的实现,所以必然存在一个个节点,Node就是一个节点,Node里面有不少方法。
整个AQS是典型的模板模式的应用,设计得十分精巧,对于FIFO队列的各类操做在AQS中已经实现了,AQS的子类通常只须要重写tryAcquire(int arg)和tryRelease(int arg)两个方法便可。
AQS的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操做,固然AQS能够确保对state的操做是安全的。
AQS经过内置的FIFO同步队列来完成资源获取线程的排队工做,若是当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构形成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
2.二、AbstactQueuedSynchronizer的基本数据结构
1.AbstractQueuedSynchronizer的等待队列是CLH队列的变种,CLH队列一般用于自旋锁,AbstractQueuedSynchronizer的等待队列用于阻塞同步器。
2.每一个节点中持有一个名为"status"的字段用因而否一条线程应当阻塞的追踪,可是status字段并不保证加锁。
3.一条线程若是它处于队列头的下一个节点,那么它会尝试去acquire,可是acquire并不保证成功,它只是有权利去竞争。
4.要进入队列,你只须要自动将它拼接在队列尾部便可;要从队列中移除,你只须要设置header字段。
2.三、AbstractQueuedSynchronizer供子类实现的方法
AbstractQueuedSynchzonizer是基于模板模式的实现,不过它的模板模式写法有点特别,整个类中没有任何一个abstract的抽象方法,取而代之的是,须要子类去实现的那些方法经过一个方法体抛出UnsupportedOperationException(集合的使用也会抛出这个异常)异常来让子类知道。
这个类的acquire很差翻译,因此就直接原词放上来了,由于acquire是一个动词,后面并无带宾语,所以不知道具体acquire的是什么。按照我我的理解,acquire的意思应当是根据状态字段state去获取一个执行当前动做的资格。
好比ReentrantLock的lock()方法最终会调用acquire方法,那么:
1.线程1去lock(),执行acquire,发现state=0,所以有资格执行lock()的动做,将state设置为1,返回true。
2.线程2去lock(),执行acquire,发现state=1,所以没有资格执行lock()的动做,返回false。
2.四、独占模式acquire实现流程
看一下AbstractQuueuedSynchronizer的acquire方法实现流程,acquire方法是用于独占模式下进行操做的:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire方法前面说过了,是子类实现的一个方法,若是tryAcquire返回的是true(成功),即代表当前线程得到了一个执行当前动做的资格,天然也就不须要构建数据结构进行阻塞等待。
若是tryAcquire方法返回的是false,那么当前线程没有得到执行当前动做的资格,接着执行"acquireQueued(addWaiter(Node.EXCLUSIVE), arg))"这句代码,这句话很明显,它是由两步构成的:
addWaiter,添加一个等待者。
acquireQueued,尝试从等待队列中去获取执行一次acquire动做。
利用LockSupport(这个使用到了sun.misc.Unsafe UNSAFE;)的park方法让当前线程阻塞。
总结:这个方法的主要目的是为了构建成一个数据结构,同时获取锁的状态。
2.五、独占模式release流程
上面整理了独占模式的acquire流程,看到了等待的Node是如何构建成一个数据结构的,下面看一下释放的时候作了什么,release方法的实现为:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
tryRelease一样是子类去实现的,表示当前动做我执行完了,要释放我执行当前动做的资格,讲这个资格让给其它线程,而后tryRelease释放成功,获取到head节点,若是head节点的waitStatus不为0的话,执行unparkSuccessor方法,顾名思义unparkSuccessor意为unpark头结点的继承者。
流程:
1.头节点的waitStatus<0,将头节点的waitStatus设置为0 。
2.拿到头节点的下一个节点s,若是s==null或者s的waitStatus>0(被取消了),那么从队列尾巴开始向前寻找一个waitStatus<=0的节点做为后继要唤醒的节点。
最后,若是拿到了一个不等于null的节点s,就利用LockSupport的unpark方法让它取消阻塞。
总结
1、对比
synchronized:不可中断锁,适合竞争不激烈,可读性好。
Lock:可中断锁,多样化同步,竞争激烈时能维持常态。
Atomic:竞争激烈时能维持常态,比Lock性能好,只能同步一个值。