勿止于结论;持续探索与求证。html
为何要使用并发 ? 有三点足够信服的理由:java
不过,并发使用姿式不当,很容易出错,致使难以估量的损失。可谓是一把双刃剑。面试
最近,团队有同窗踩了并发的坑。我想,要不梳理下并发的一些陷阱及相关原理和解决方案吧,以备后用。
算法
并非在全部状况下都须要使用并发。Java 多线程模型,在并发执行的时候会有线程建立、切换、阻塞、调度的开销、内存同步的开销等。编程
在单核 CPU 上,运行 CPU 密集型计算,并不须要使用并发,由于 CPU 自己就是饱和的,使用并发只能带来没必要要的线程切换和同步开销。 在多核 CPU 处理器上,运行 CPU 密集型计算,线程数应该与 CPU 个数同样,以便于将工做合理分配到每一个 CPU 上,很少很多。 若是线程数大于 CPU 核数,那么就会有没必要要的线程切换;若是线程数小于 CPU 核数,就没法充分利用全部的核。缓存
实际应用中,经常是 RPC 调用和纯计算业务逻辑处理的交替执行,也就是 IO 密集型,或者 IO 和 CPU 密集型交叉的任务,则须要线程数远远大于 CPU 核数,来避免 线程等待 IO 操做完成以前的无所事事。安全
要讨论并发问题,首先要理解,何为线程安全的 ? 详解可参阅《Java并发编程实战》的第二章。多线程
线程安全,是指在多线程执行环境下,并发执行的结果与串行执行的结果始终一致。 这句话有两层意思: 1. 不会由于线程执行顺序不肯定,致使不肯定的结果;2. 多线程并发执行的结果,应该与多线程串行执行的结果一致。它们的差异仅仅体如今速度上,而不是结果上。并发
在具体措施上,表现为多线程对“含有共享可变状态的对象”的访问控制与同步。这里有两个前提: 1. 多线程。 单线程执行环境是线程安全的; 2. 共享可变。 不可变的对象是线程安全的;没有任何写操做的共享可变对象是线程安全的; 3. 无状态的对象是线程安全的。异步
多线程环境下,保证并发安全的若干理念:
从一开始设计成线程安全的类,比在之后将类修改为线程安全的类,要更容易和安全得多。由于线程不安全的类,在实际业务系统中可能已经在各类场景下使用到了,修改为线程安全的类,会致使性能降低,产生不可预知的后果。
程序状态的封装性越好,就越容易实现线程安全的访问,也更容易维护。
优先考虑使用现有的线程安全的类和同步工具类。
使用不可变量和无状态对象(一般是应用中的全局无状态组件)。
不共享变量,好比尽可能使用方法内的局部变量,或者声明组件为原型模式。
规定哪些操做组合必须符合原子性,并借助同步和锁来实现组合操做的原子性。
以下代码一所示。先起一个线程,将 isReady 设置为 true ,而后再进入循环,判断 isReady 是否为 true ,为 true 则退出。
这段代码是线程安全的吗? 如何判断 ? 不妨假设这两个线程按照代码顺序串行执行。那么,打印 ready! 以后,不该该有 not ready 的打印。
代码一:
public class NoVisibility { private boolean isReady = false; public void ready() { isReady = true; } public boolean isReady() { return isReady; } static class NoVisibilityTester { public static void main(String[]args) { NoVisibility noVisibility = new NoVisibility(); new Thread(() -> { noVisibility.ready(); System.out.println(System.nanoTime() + ": ready!"); }).start(); while(true) { if (noVisibility.isReady()) { System.out.println(System.nanoTime() + ": main thread now is ready"); break; } System.out.println(System.nanoTime() + ": not ready"); } System.out.println(System.nanoTime() + ": now exit"); } } }
运行屡次,获得以下结果。打印 ready! 以后,还有 not ready 的输出。这是为何呢?
167774951548450: ready! 167774951549923: not ready 167774951822008: main thread now is ready 167774951857190: now exit
这里涉及到并发读写的最最基本的陷阱:可见性。根据 “Jvm内存模型深度理解”,当线程更新 isReady 以后,只是写到本身的线程缓存里,并无当即刷新到主内存中。那么主线程须要等待一段时间,才能检测到主内存的 isReady 已经变化了。
若是不打算使用锁的话,能够加上可见性修饰符:volatile 。 volatile 会当即将更新的线程缓存值刷新到主内存中,使得全部访问该共享变量的线程都能当即感知到新的值。volatile 是一个极轻量级的同步机制,经常使用于判断标志位是否更新。可是 volatile 并不适合作同步锁(基于 volatile 的同步是很脆弱的)。
一个含有未受保护的实例变量的对象,在多线程环境中访问是不安全的。这大概是关于并发陷阱的最经典的栗子。
如代码二所示:一个 UnSafeObject 含有一个实例变量 i 。在 main 中,建立了 3 个线程,分别会设置 i,而后休眠 200ms ,再获取 i 。
这是线程安全的吗 ? 按照上述定义来看,若是并发与串行执行结果一致,那么应该是:每一个线程都会拿到与本身线程号对应的值。至少不会拿到其余的线程号。
代码二:
@Setter @Getter public class UnSafeObject { private int i = 0; public static void main(String[] args) { UnSafeObject unSafeObject = new UnSafeObject(); ThreadStarter.startMultiThreads( (ti) -> { unSafeObject.setI(ti); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } System.out.println("Thread" + ti + ":" + unSafeObject.getI()); } ); } }
打印结果以下:很明显,每一个线程拿到的值并不必定是它本身设置的。由于在多线程环境下,i 可能被任何一个线程所修改。
Thread2:2 Thread1:2 Thread0:2 Thread2:0 Thread1:0
致使对象 UnSafeObject 的实例变量 i 在多线程环境下访问不安全的缘由是:JVM 并发机制是基于共享内存模型的。可阅:“Jvm内存模型深度理解”。 这篇文章讲得详细。
怎样才能将 UnSafeObject 变成线程安全的呢 ? 最简便的方式是,将 i 声明为原子的 AutoInteger。 当只须要一个单变量的原子操做时,使用原子类。
原子类采用的是基于硬件能力提供的 CAS ,能够安全替代 volatile 的使用。在竞争适度(如何衡量?)的状况下, CAS 可以提供更好的性能和可伸缩性。 CAS 有个“ABA”的问题,能够经过增长一个版本号来解决。 CAS 是非阻塞算法,是乐观锁的实现方式。在实际系统中也常常会用到,好比 DB 的乐观锁,ES 版本控制等。经过问题转换,将并发修改的范围映射到原子变量的修改上,能够拓展非阻塞并发的使用范围。
详情可阅:《并发编程实战》的第十五章。
对于这样一个简单例子,你们耳熟能详。不过,换到真实环境里,还能看出来么? 以下代码三所示。这段代码有什么问题呢? LightTcOrderFormat 是一个含有实例变量 tcOrder 的 Component。 咋一看,确实没啥问题。可是,若是放在多线程环境里跑一跑,tcOrder 就会被随意篡改。
代码三:
@Slf4j @Component public class LightTcOrderFormat extends LightTcOrderDTO implements LightFormat { private TcOrder tcOrder; @Override public LightResultDTO format(WholeOrderSet wholeOrderSet, LightResultDTO lightResultDTO) { // code ... this.tcOrder = wholeOrderSet.getTcOrder(); LightTcOrderDTO lightTcOrderDTO = new LightTcOrderDTO(); BeanUtils.copyProperties(wholeOrderSet.getTcOrder(), this); BeanUtils.copyProperties(this, lightTcOrderDTO); lightResultDTO.setOrder(lightTcOrderDTO); return lightResultDTO; } }
避免措施: LightTcOrderFormat 声明为原型模式:@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) ,成为不共享的对象。
以下代码四所示,使用了 ConcurrentHashMap 对 map 中的 [key,value] 进行保护。
输出 final 不必定等于 300000。为何会这样呢? 虽然 get 与 put 是原子操做,可是组合成一个 add 方法, add 方法是非原子化的。两个线程彻底可能,同时执行 get("key") = 5 ; 而后前后 put("key", 6) ,使得最终值为 6, 而不是 7。
代码四:
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class UnatomicOperation { private Map<String, Integer> map = new ConcurrentHashMap<>(); public void add(String key) { Integer value = map.get(key); if(value == null) { map.put(key, 1); } else { map.put(key, value + 1); } } public void nonSafeAdd(String key) { map.put(key, map.putIfAbsent(key, 1)+1); } public Integer get(String key) { return map.get(key); } public static void main(String[] args) { UnatomicOperation unatomicOperation = new UnatomicOperation(); ThreadStarter.startMultiThreads( (ti) -> { unatomicOperation.nonSafeAdd("key"); System.out.println(ti + ":" + unatomicOperation.get("key")); } ); System.out.println("final: " + unatomicOperation.get("key")); } }
有小伙伴问:那么写成这样能否 ? 看看 putIfAbsent 的实现,就知道也是不能够的。由于后者只是比前者表达更加简洁,但效果是一致的。
public void nonSafeAdd(String key) { map.put(key, map.putIfAbsent(key, 1)+1); }
一系列原子操做组合后的复合操做,若是不具备原子化,也会有线程不安全的问题。
一种解决方案是,对对象的全部须要并发访问的方法使用 synchronized 关键字修饰。若是方法里的操做耗时都比较平均,不存在耗时很大的操做,这种方法最经济。详可阅:“深刻理解 Synchronized”
同步锁,本质是封闭思想的一种体现。将对共享可变量的访问,限制在指定的同步方法或由锁构建的临界区中。
代码五:
public class SychronizedOperation { private Map<String, Integer> map = new HashMap<>(); public synchronized void add(String key) { Integer value = map.get(key); if(value == null) { map.put(key, 1); } else { map.put(key, value + 1); } } public synchronized Integer get(String key) { return map.get(key); } public static void main(String[] args) { SychronizedOperation sychronizedOperation = new SychronizedOperation(); ThreadStarter.startMultiThreads( (ti) -> { sychronizedOperation.add("key"); System.out.println(ti + ":" + sychronizedOperation.get("key")); } ); System.out.println("final: " + sychronizedOperation.get("key")); } }
问题:读操做也要加 synchronized 吗? 为何 ?
虽然使用 synchronized 解决了问题,可是稍有不当,会带来性能问题。 这里加在方法上,实际上就是对整个 map 加锁,而 ConcurrentHashMap 是有分段锁优化的,这样就将分段锁优化的优点给去掉了。 那么,如何在保存 ConcurrentHashMap 的优点基础上,安全地访问 key 呢?
这里实际上设计两层锁:1. 给 key 加锁,是分段的; 2. 给计数加锁。 这里能够将初始化的部分抽离出来单独加锁。以下代码六所示。使用 ConcurrentHashMap + AtomicLong 强强联合,来解决这个问题。ConcurrentHashMap 给 key 加分段锁,AtomicLong 给访问同一个 key 的 value 加锁;还有一个给 value 为空时的初始化加锁。
代码六:
public class ConcurrentCombinedOperation { private Map<String, AtomicLong> map = new ConcurrentHashMap<>(); private Lock lock = new ReentrantLock(); public long addAndGet(String key) { if(lock.tryLock()) { try { map.putIfAbsent(key, new AtomicLong()); } finally { lock.unlock(); } } return map.get(key).incrementAndGet(); } public long get(String key) { return map.get(key).get(); } }
如上代码所示,虽然将 get-put 中的 put 解放出来了,可是依然有两个不足:
所以,还须要进一步进行优化。仔细思考可知,实际上只须要初始化(key 对应的 value 为空)的时候加锁便可。如代码七所示,使用 DCL 来安全初始化 key 对应的 AtomicLong 对象。
代码七:
public long addAndGetEffective(String key) { init(key); return map.get(key).incrementAndGet(); } private void init(String key) { if (map.get(key) == null) { synchronized (key) { if (map.get(key) == null) { map.put(key, new AtomicLong()); } } } }
问题:以上代码涉及到 key 的监控对象锁和分段锁,是否会出现死锁问题 ?
上述同步代码中,分别使用了 synchronized 和 ReentrantLock 。那么,它们有什么异同?如何在二者之间进行选择呢? 如下是一些建议:
原子操做组合的非原子化,还表如今一种经常使用状况:关联不一致性。也就是说,两个变量的变化,必须符合某种一致性规约。好比正方形的边长与面积,就是同步变化的。以下代码八所示。 最终输出状况,square 与 area 不必定会知足 area = square * square 的关系。
代码八:
public class RelatedInconsistency { private int square; private int area; public RelatedInconsistency(int square) { this.square = square; computeArea(); } public void set(int square) { this.square = square; computeArea(); System.out.println("square: " + square + " area:" + area); } private void computeArea() { this.area = this.square * this.square; } static class RelatedInconsistencyTest { public static void main(String[]args) throws InterruptedException { RelatedInconsistency relatedInconsistency = new RelatedInconsistency(1); ThreadStarter.startMultiThreads(3000, 10, (th) -> relatedInconsistency.set(th) ); } } }
以下代码九所示。你能看出问题所在吗 ? add 也加了 synchronized 关键字。 看上去貌似是没有问题的。
代码九:
public class EscapedObject { private List<Integer> nums = new ArrayList<>(); private synchronized void add(Integer num) { if (num != null) { nums.add(num); } } public List<Integer> getNums() { return nums; } static class EscapedObjectTester { public static void main(String[] args) throws InterruptedException { EscapedObject escapedObject = new EscapedObject(); escapedObject.add(5); List<Integer> escaped = escapedObject.getNums(); ThreadStarter.startMultiThreads(3, 3, (ti) -> { escaped.add(ti*ti); System.out.println(ti + ":" + escaped); } ); } } }
输出结果以下。将线程数调大,抛出异常 ConcurrentModificationException 的几率会更大。
1:[5, 0, 1] 0:[5, 0, 1] 1:[5, 0, 1, 1] 0:[5, 0, 1, 1, 0] 1:[5, 0, 1, 1, 0, 4, 1] 0:[5, 0, 1, 1, 0, 4, 1, 0] Exception in thread "Thread-2" java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
为何还会抛并发修改的异常呢 ? getNums 闯了祸。这个方法将不安全的 nums 暴露出去了。 换个角度说, nums 经过 getNums 这个方法逃逸出去了。 这样 nums 就可能被多个线程同时更改了。
解决方案:1. 不对外暴漏这个实例变量,仅可经过指定方法访问(封闭的思想);2. 若是须要获取这个 nums ,使它变成不可变的。不容许逃逸出去的对象被修改。这实际上遵循了“不可变量老是线程安全的”原理。
public List<Integer> getImmutableNums() { return Collections.unmodifiableList(nums); }
事实上,即便返回不可变的 List, List 里的对象依然是线程不安全的。由于 List 的逸出,连带着将 List 里的对象也逸出了。 所以,对于容器的并发,避免将整个容器都返回出去。
这个问题恐怕是不多人会特别注意到的陷阱。在调用频繁的实例方法中建立线程池,会致使建立线程数不受控地增加,最终致使应用崩溃。
以下代码十所示。在局部方法 freqCalledMethod 不断建立新的局部线程池。只要方法调用足够次数,就会致使应用崩溃。
代码十:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UncontrolledLocalThreadPool { public static void main(String[] args) { int n = 100000; for (int i=0; i< n; i++) { freqCalledMethod(); } System.out.println("here"); } public static void freqCalledMethod() { ExecutorService threadExecutor = Executors.newFixedThreadPool(10); for (int i=0; i< 10; i++) { threadExecutor.submit(() -> 9999L * 9999L); } } }
报:
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
报这个错误的缘由是超出了JVM容许建立的最大线程数。
The "java.lang.OutOfMemoryError: unable to create new native thread" is thrown by the JVM whenever it hit the limit of how many threads it can create. The limit is imposed by the operating system.
线程池本来是用来使得应用中建立的线程数是可控的,结果线程池的建立变得不可控了,显然也会致使线程数不可控。
解决方案:切忌在大量频繁调用的实例方法里建立线程池。建立配置良好的全局线程池。
能够经过以下程序来测试机器上的最大可建立线程数。TimeUnit.SECONDS.sleep(1000); 是为了避免让线程过快的退出。
代码十一:
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThreadMaxCount { private static AtomicInteger count = new AtomicInteger(); public static void main(String[] args) throws InterruptedException { while (true) { new Thread(() -> { try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { } }).start(); System.out.println("thread num:" + count.incrementAndGet()); TimeUnit.MILLISECONDS.sleep(10); } } }
若是你有一个比较长的业务链路,有一些公共数据要在整个链路中传递。要么,将公共数据放在方法中逐层传递下去,要么建立一个 ThreadLocal 来保存这些公共数据,在链路里传递。
ThreadLocal 是线程本地副本,每一个线程有本身私有的一份数据。经过不共享的思路去避免并发修改问题。不过 ThreadLocal 若是与线程池结合使用,就会有问题。
代码十二:
public class ThreadLocalLeak { private ThreadLocal<Integer> context = new ThreadLocal(); public ThreadLocalLeak(Integer initValue) { context.set(initValue); } public Integer get() { return context.get(); } public void set(Integer initValue) { context.set(initValue); } public void clear() { context.remove(); } public static void main(String[] args) throws InterruptedException { ThreadLocalLeak threadLocalLeak = new ThreadLocalLeak(5); ExecutorService executor = Executors.newFixedThreadPool(10); executor.execute( () -> { for (int i=0; i<=100000; i++) { System.out.println(System.nanoTime() + " set before:" + Thread.currentThread() + ": " + threadLocalLeak.get()); threadLocalLeak.set(i); System.out.println(System.nanoTime() + " set after:" + Thread.currentThread() + ": " + threadLocalLeak.get()); //threadLocalLeak.clear(); } } ); executor.shutdown(); executor.awaitTermination(3000, TimeUnit.SECONDS); } }
输出结果以下:
要理解 ThreadLocal 的实现,关键是理解 Thread 中含有一个 ThreadLocal.ThreadLocalMap 对象。这个对象实际上就是线程的本地副本。 之因此不直接用 Object, 是由于要实现两个目标: 1. 安全。 使用泛型来存取副本对象,编写代码更加安全,避免强制类型转换; 2. 须要存储多个值。ThreadLocalMap 的 Key 是一个 WeakReference[ThreadLocal] ,ThreadLocal 经过 AtomicInteger 实现了 hashCode 的约定,并提供了方法来获取当前执行线程的本地副本的值。
当线程在线程池中被复用时,执行下一次任务时,就可能拿到上一次任务执行后的残留数据了。
解决方案:在线程执行完任务后,将 ThrealLocal 中的内容清空。
为了测试并发陷阱,须要启动多线程去执行任务。为避免写重复代码,须要先写个通用的多线程启动代码。如代码十三所示。这段代码使用了 t.join 方法来同步线程之间的活动,使主线程必须在全部子线程执行以后才退出。 这样作并无多大问题。不过,使用比较底层的 API 比使用成熟的同步工具类,会更有风险。
代码十三:
import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; public class ThreadStarter { public static void startMultiThreads(Consumer<Integer> consumer) { try { startMultiThreads(3, 100000, consumer); } catch (InterruptedException e) { System.err.println("error: " + e.getMessage()); } } public static void startMultiThreads(int threadNum, int times, Consumer<Integer> consumer) throws InterruptedException { List<Thread> threadList = new ArrayList<>(); for (int t=0; t < threadNum; t++) { int threadIndex = t; Thread th = new Thread(() -> { for (int i=0; i < times; i++) { consumer.accept(threadIndex); } }); threadList.add(th); th.start(); } for (Thread t: threadList) { t.join(); } } }
这里须要一个“栅栏”:当全部线程都到达这个栅栏的时候,才触发后续的活动。这其实是个通用的功能。使用 CountDownLatch 工具来实现更佳。
如代码十四所示。启动线程引用了 CountDownLatch 对象。当线程执行完成退出时,就将 CountDownLatch 计数减一。当 CountDownLatch 计数为 0 时,就会释放栅栏,让等待的主线程经过。
代码十四:
public static void startMultiThreadsV2(int threadNum, int times, Consumer<Integer> consumer) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i=0; i<threadNum; i++) { new Thread( new Worker(countDownLatch, consumer, i, times), "t"+i ).start(); } countDownLatch.await(); } static class Worker implements Runnable { private CountDownLatch countDownLatch; private Consumer consumer; private int threadIndex; private int times; public Worker(CountDownLatch countDownLatch, Consumer consumer, int threadIndex, int times) { this.countDownLatch = countDownLatch; this.consumer = consumer; this.threadIndex = threadIndex; this.times = times; } @Override public void run() { for (int i=0; i < times; i++) { consumer.accept(threadIndex); } countDownLatch.countDown(); } }
Java 并发的这些陷阱,从根本上去追溯,都是由共享内存模型所带来的。若是换用基于消息投递的方式,好比 “混合使用ForkJoin+Actor+Future实现一千万个不重复整数的排序(Scala示例)” , 天然就不存在这些问题了,固然,消息投递又会带来新的问题:好比消息接收不到,消息延迟,处理的异步化,反直觉的编程模型等。
从根因上去探索,换一种思路和作法,看到的空间更为广阔。
本文主要梳理了并发的一些陷阱及相关原理和解决方案。要编写正确的并发程序,须要很是仔细才行。使用不共享、不可变、可见性修饰、封闭访问、加锁、同步等机制来保证线程安全性。