https://github.com/Wasabi1234...java
同时拥有两个或者多个线程,若是程序在单核处理器上运行多个线程将交替地换入或者换出内存,这些线程是同时“存在"的,每一个线程都处于执行过程当中的某个状态,若是运行在多核处理器上,此时,程序中的每一个线程都将分配到一个处理器核上,所以能够同时运行.git
互联网分布式系统架构设计中必须考虑的因素之一,一般是指,经过设计保证系统可以同时并行处理不少请求.github
CPU的频率太快了,快到主存跟不上
如此,在处理器时钟周期内,CPU经常须要等待主存,浪费资源。因此cache的出现,是为了缓解CPU和内存之间速度的不匹配问题(结构:cpu-> cache-> memory ).缓存
若是某个数据被访问,那么与它相邻的数据很快也可能被访问安全
用于保证多个 CPU cache 之间缓存共享数据的一致数据结构
该缓存行只被缓存在该 CPU 的缓存中,而且是被修改过的,与主存中数据是不一致的,需在将来某个时间点写回主存,该时间是容许在其余CPU 读取主存中相应的内存以前,当这里的值被写入主存以后,该缓存行状态变为 E多线程
缓存行只被缓存在该 CPU 的缓存中,未被修改过,与主存中数据一致
可在任什么时候刻当被其余 CPU读取该内存时变成 S 态,被修改时变为 M态架构
该缓存行可被多个 CPU 缓存,与主存中数据一致并发
处理器为提升运算速度而作出违背代码原有顺序的优化app
以上两者一般和线程池搭配
下面开始作并发模拟
package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @Slf4j @NotThreadSafe public class ConcurrencyTest { /** * 请求总数 */ public static int clientTotal = 5000; /** * 同时并发执行的线程数 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { //定义线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定义信号量,给出容许并发的线程数目 final Semaphore semaphore = new Semaphore(threadTotal); //统计计数结果 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); //将请求放入线程池 for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { //信号量的获取 semaphore.acquire(); add(); //释放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); //关闭线程池 executorService.shutdown(); log.info("count:{}", count); } /** * 统计方法 */ private static void add() { count++; } }
运行发现结果随机,因此非线程安全
当多个线程访问某个类时,无论运行时环境采用何种调度方式
或者这些进程将如何交替执行,而且在主调代码中不须要任何额外的同步或协同
,这个类都能表现出正确的行为
,那么就称这个类是线程安全的
提供了互斥访问,同一时刻只能有一个线程来对它进行操做
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @Slf4j @ThreadSafe public class AtomicExample2 { /** * 请求总数 */ public static int clientTotal = 5000; /** * 同时并发执行的线程数 */ public static int threadTotal = 200; /** * 工做内存 */ public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { System.out.println(); semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); //主内存 log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @author shishusheng * @date 18/4/3 */ @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { // 2 count.compareAndSet(0, 2); // no count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // 4 count.compareAndSet(2, 4); // no count.compareAndSet(3, 5); log.info("count:{}", count.get()); } }
synchronized:依赖 JVM
package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j @ThreadSafe public class CountExample3 { /** * 请求总数 */ public static int clientTotal = 5000; /** * 同时并发执行的线程数 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private synchronized static void add() { count++; } }
synchronized 修正计数类方法
子类继承父类的被 synchronized 修饰方法时,是没有 synchronized 修饰的!!!
Lock: 依赖特殊的 CPU 指令,代码实现
个值
一个线程对主内存的修改能够及时的被其余线程观察到
JMM关于synchronized的规定
用共享变量时须要从主内存中从新读取最新的值(加锁与解锁是同一把锁
)
经过加入内存屏障和禁止重排序优化来实现
屏障指令,将本地内存中的共享变量值刷新到主内存
屏障指令,从主内存中读取共享变量
volatile boolean inited = false; //线程1: context = loadContext(); inited= true; // 线程2: while( !inited ){ sleep(); } doSomethingWithConfig(context)
一个线程观察其余线程中的指令执行顺序,因为指令重排序的存在,该观察结果通常杂乱无序
JMM容许编译器和处理器对指令进行重排序,可是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性
package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; /** * 懒汉模式 -》 双重同步锁单例模式 * 单例实例在第一次使用时进行建立 * @author shishusheng */ @NotThreadSafe public class SingletonExample4 { /** * 私有构造函数 */ private SingletonExample4() { } // 一、memory = allocate() 分配对象的内存空间 // 二、ctorInstance() 初始化对象 // 三、instance = memory 设置instance指向刚分配的内存 // JVM和cpu优化,发生了指令重排 // 一、memory = allocate() 分配对象的内存空间 // 三、instance = memory 设置instance指向刚分配的内存 // 二、ctorInstance() 初始化对象 /** * 单例对象 */ private static SingletonExample4 instance = null; /** * 静态的工厂方法 * * @return */ public static SingletonExample4 getInstance() { // 双重检测机制 // B if (instance == null) { // 同步锁 synchronized (SingletonExample4.class) { if (instance == null) { // A - 3 instance = new SingletonExample4(); } } } return instance; } }
同步组件
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 指定时间内处理任务 * * @author shishusheng * */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { // 尝试获取一个许可 if (semaphore.tryAcquire()) { test(threadNum); // 释放一个许可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
看出是顺序执行的