欢迎关注公众号【sharedCode】致力于主流中间件的源码分析, 我的网站:https://www.shared-code.com/java
前言
在上一篇文章 多线程篇-父子线程的上下文传递
的文末,咱们了解到JDK提供的InheritableThreadLocal
在线程池中的使用状况并非太理想,由于在复用线程的状况下,获得的值颇有可能不是咱们想要的,接下来我要给你们介绍一款开源组件,阿里开源的,用的感受还不错。git
TransmittableThreadLocal
通常状况下,ThreadLocal均可以知足咱们的需求,当咱们出现须要 在使用线程池等会池化复用线程的执行组件状况下传递ThreadLocal
,github
这个场景就是TransmittableThreadLocal解决的问题。安全
Github地址:https://github.com/alibaba/transmittable-thread-local多线程
感兴趣的能够去下载的玩一玩,接下来咱们来介绍一下这个组件的神奇之处。ide
首先看个demo, 经过demo,咱们先了解了解怎么用源码分析
demo
/** * ttl测试 * * @author zhangyunhe * @date 2020-04-23 12:47 */ public class Test { // 1. 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的 static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>(); // 初始化一个长度为1的线程池 static ExecutorService poolExecutor = Executors.newFixedThreadPool(1); public static void main(String[] args) throws ExecutionException, InterruptedException { Test test = new Test(); test.test(); } private void test() throws ExecutionException, InterruptedException { // 设置初始值 local.set("天王老子"); //!!!! 注意:这个地方的Task是使用了TtlRunnable包装的 Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1"))); future.get(); Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2"))); future2.get(); System.out.println("父线程的值:"+local.get()); poolExecutor.shutdown(); } class Task implements Runnable{ String str; Task(String str){ this.str = str; } @Override public void run() { // 获取值 System.out.println(Thread.currentThread().getName()+":"+local.get()); // 从新设置一波 local.set(str); } } }
输出结果:测试
pool-1-thread-1:天王老子 pool-1-thread-1:天王老子 父线程的值:天王老子
原理分析
咱们首先看一下TransmittableThreadLocal
的源码,网站
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> { // 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } }; @SuppressWarnings("unchecked") private void addThisToHolder() { if (!holder.get().containsKey(this)) { holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value. } } @Override public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); return value; } /** * see {@link InheritableThreadLocal#set} */ @Override public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // may set null to remove value remove(); } else { super.set(value); addThisToHolder(); } } /** * see {@link InheritableThreadLocal#remove()} */ @Override public final void remove() { removeThisFromHolder(); super.remove(); } private void superRemove() { super.remove(); } }
步骤说明:this
- 在代码中,做者构建了一个holder对象,这个对象是一个
InheritableThreadLocal
, 里面的类型是一个弱引用的WeakHashMap , 这个map的va lu就是TransmittableThreadLocal
, 至于value永远都是空的
holder里面存储的是这个应用里面,全部关于TransmittableThreadLocal
的引用。
- 从上面能够看到,每次get, set ,remove都会操做holder对象,这样作的目的是为了保持
TransmittableThreadLocal
全部的这个引用都在holder里面存一份。
TtlRunnable
回到咱们上面的代码
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
细心的朋友可能已经发现了,咱们调用了TtlRunnable
对象的get方法,下面看一下这个方法有啥做用吧
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null; if (runnable instanceof TtlEnhanced) { // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException("Already TtlRunnable!"); } // 重点在这里 return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); }
看上面的代码,细节上咱们不看,咱们看大体的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象。
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { //重点在这里 this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
下面这行代码,运行到这里的时候,仍是在主线程里面,调用了capture
方法
this.capturedRef = new AtomicReference<Object>(capture());
capture
public static Object capture() { // 构建一个临时对象,主要看captureTtlValues方法 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { // 构建一个WeakHashMap方法, WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); // 在主线程里面,调用holder变量,循环获取里面全部的key和value for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } // 返回出去 return ttl2Value; }
步骤说明:
1.调用静态变量holder, 循环获取里面全部的key和value, value的获取就比较巧妙一点。
private T copyValue() { // 这里的get方法,调用的是父类的方法,能够在父类里面最终获取到当前TransmittableThreadLocal所对应的value return copy(get()); }
2.组装好一个WeakHashMap出去,最终就会到了咱们上面的构造方法里面,针对capturedRef
的赋值操做。
run
@Override public void run() { //1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据 Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } // 清除不在captured里面的key,同时在这个子线程中,对全部的ThreadLocal进行从新设置值 Object backup = replay(captured); try { // 执行实际的线程方法 runnable.run(); } finally { // 作好还原工做,根据backup restore(backup); } } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 作好当前线程的local备份 backup.put(threadLocal, threadLocal.get()); // 清除数据,不在captured里面的。 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 这里就是把值设置到当前线程的TransmittableThreadLocal里面。 setTtlValuesTo(captured); // 一个钩子 doExecuteCallback(true); return backup; }
总结:
一些朋友看了上面的那么多源码,可能有点蒙,我可能说的也有点乱,在这里总结一下。
1.经过继承InheritableThreadLocal,新成立一个TransmittableThreadLocal类, 该类中有一个hodel变量,用来维护全部的TransmittableThreadLocal引用。
2.在实际submit任务到线程池的时候,咱们是须要调用TtlRunnable.get方法,构建一个任务的包装类。这里使用装饰者模式,对runnable线程对象进行装饰包装,在初始化这个包装对象的时候,会获取主线程里面全部的TransmittableThreadLocal引用,以及里面全部的值,这个值其实就是当前父线程里面的(跟你当时建立这个线程的父线程没有任何关系,注意,这里讲的是线程池的场景)。
3.对数据作规整,根据收集到的captured
(这个对象里面存储的都是主线程里面可以获取到TransmittableThreadLocal以及对应的值) 作规整,去掉当前线程里面不须要的,同时将剩余的key和value ,更新到当前线程的ThreadLocal里面。这样就达到了在池化技术里面父子线程传值的安全性