多线程篇-TransmittableThreadLocal解决池化复用线程的传值问题

欢迎关注公众号【sharedCode】致力于主流中间件的源码分析, 我的网站:https://www.shared-code.com/java

前言

在上一篇文章 多线程篇-父子线程的上下文传递 的文末,咱们了解到JDK提供的InheritableThreadLocal 在线程池中的使用状况并非太理想,由于在复用线程的状况下,获得的值颇有可能不是咱们想要的,接下来我要给你们介绍一款开源组件,阿里开源的,用的感受还不错。git

TransmittableThreadLocal

通常状况下,ThreadLocal均可以知足咱们的需求,当咱们出现须要 在使用线程池等会池化复用线程的执行组件状况下传递ThreadLocal ,github

这个场景就是TransmittableThreadLocal解决的问题。安全

Github地址:https://github.com/alibaba/transmittable-thread-local多线程

感兴趣的能够去下载的玩一玩,接下来咱们来介绍一下这个组件的神奇之处。ide

首先看个demo, 经过demo,咱们先了解了解怎么用源码分析

demo

/**
 * ttl测试
 *
 * @author zhangyunhe
 * @date 2020-04-23 12:47
 */
public class Test {

    // 1. 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的
    static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();

    // 初始化一个长度为1的线程池
    static ExecutorService poolExecutor = Executors.newFixedThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Test test = new Test();
        test.test();
    }
    private void test() throws ExecutionException, InterruptedException {

        // 设置初始值
        local.set("天王老子");
        //!!!! 注意:这个地方的Task是使用了TtlRunnable包装的
        Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
        future.get();

        Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2")));
        future2.get();
      
        System.out.println("父线程的值:"+local.get());
        poolExecutor.shutdown();
    }

    class Task implements Runnable{

        String str;
        Task(String str){
            this.str = str;
        }
        @Override
        public void run() {
            // 获取值
            System.out.println(Thread.currentThread().getName()+":"+local.get());
            // 从新设置一波
            local.set(str);
        }
    }
}

输出结果:测试

pool-1-thread-1:天王老子
pool-1-thread-1:天王老子
父线程的值:天王老子

原理分析

咱们首先看一下TransmittableThreadLocal的源码,网站

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
  
  // 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable
  private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
            new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }

                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
                }
            };

    @SuppressWarnings("unchecked")
    private void addThisToHolder() {
        if (!holder.get().containsKey(this)) {
            holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
        }
    }
  
  @Override
    public final T get() {
        T value = super.get();
        if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
        return value;
    }
    /**
     * see {@link InheritableThreadLocal#set}
     */
    @Override
    public final void set(T value) {
        if (!disableIgnoreNullValueSemantics && null == value) {
            // may set null to remove value
            remove();
        } else {
            super.set(value);
            addThisToHolder();
        }
    }
    /**
     * see {@link InheritableThreadLocal#remove()}
     */
    @Override
    public final void remove() {
        removeThisFromHolder();
        super.remove();
    }

    private void superRemove() {
        super.remove();
    }
}

步骤说明:this

  1. 在代码中,做者构建了一个holder对象,这个对象是一个InheritableThreadLocal , 里面的类型是一个弱引用的WeakHashMap , 这个map的va lu就是TransmittableThreadLocal , 至于value永远都是空的

holder里面存储的是这个应用里面,全部关于TransmittableThreadLocal的引用。

  1. 从上面能够看到,每次get, set ,remove都会操做holder对象,这样作的目的是为了保持TransmittableThreadLocal全部的这个引用都在holder里面存一份。

TtlRunnable

回到咱们上面的代码

Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));

细心的朋友可能已经发现了,咱们调用了TtlRunnable 对象的get方法,下面看一下这个方法有啥做用吧

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) return null;

        if (runnable instanceof TtlEnhanced) {
            // avoid redundant decoration, and ensure idempotency
            if (idempotent) return (TtlRunnable) runnable;
            else throw new IllegalStateException("Already TtlRunnable!");
        }
  			// 重点在这里
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }

看上面的代码,细节上咱们不看,咱们看大体的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象。

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
  			//重点在这里
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

下面这行代码,运行到这里的时候,仍是在主线程里面,调用了capture方法

this.capturedRef = new AtomicReference<Object>(capture());
capture
public static Object capture() {
    // 构建一个临时对象,主要看captureTtlValues方法
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
  // 构建一个WeakHashMap方法,
  WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  // 在主线程里面,调用holder变量,循环获取里面全部的key和value
  for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
    ttl2Value.put(threadLocal, threadLocal.copyValue());
  }
  // 返回出去
  return ttl2Value;
}

步骤说明:

1.调用静态变量holder, 循环获取里面全部的key和value, value的获取就比较巧妙一点。

private T copyValue() {
      // 这里的get方法,调用的是父类的方法,能够在父类里面最终获取到当前TransmittableThreadLocal所对应的value
      return copy(get());
}

2.组装好一个WeakHashMap出去,最终就会到了咱们上面的构造方法里面,针对capturedRef 的赋值操做。

run
@Override
public void run() {
  //1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据
  Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
    throw new IllegalStateException("TTL value reference is released after run!");
  }
	// 清除不在captured里面的key,同时在这个子线程中,对全部的ThreadLocal进行从新设置值
  Object backup = replay(captured);
  try {
    // 执行实际的线程方法
    runnable.run();
  } finally {
    // 作好还原工做,根据backup
    restore(backup);
  }
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
            WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      // 作好当前线程的local备份
      backup.put(threadLocal, threadLocal.get());

      // 清除数据,不在captured里面的。
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // 这里就是把值设置到当前线程的TransmittableThreadLocal里面。
    setTtlValuesTo(captured);

    // 一个钩子
    doExecuteCallback(true);

    return backup;
}

总结:

一些朋友看了上面的那么多源码,可能有点蒙,我可能说的也有点乱,在这里总结一下。

1.经过继承InheritableThreadLocal,新成立一个TransmittableThreadLocal类, 该类中有一个hodel变量,用来维护全部的TransmittableThreadLocal引用。

2.在实际submit任务到线程池的时候,咱们是须要调用TtlRunnable.get方法,构建一个任务的包装类。这里使用装饰者模式,对runnable线程对象进行装饰包装,在初始化这个包装对象的时候,会获取主线程里面全部的TransmittableThreadLocal引用,以及里面全部的值,这个值其实就是当前父线程里面的(跟你当时建立这个线程的父线程没有任何关系,注意,这里讲的是线程池的场景)。

3.对数据作规整,根据收集到的captured (这个对象里面存储的都是主线程里面可以获取到TransmittableThreadLocal以及对应的值) 作规整,去掉当前线程里面不须要的,同时将剩余的key和value ,更新到当前线程的ThreadLocal里面。这样就达到了在池化技术里面父子线程传值的安全性

相关文章
相关标签/搜索