本文分享 ThreadLocal 遇到 Hystrix 时上下文信息传递的方案。java
笔者在业务开发中涉及到使用 ThreadLocal 来存放上下文链路中一些关键信息,其中一些业务实现对外部接口依赖,对这些依赖接口使用了Hystrix做熔断保护,但在使用Hystrix做熔断保护的方法中发现了获取 ThreadLocal 信息与预期不一致问题,本文旨在探讨如何解决这一问题。git
在Java编程语言里ThreadLocal是用来方便开发人员在同一线程上下文中不一样类、不一样方法中共享信息的,ThreadLocal变量不受其余线程的影响,不一样线程间相互隔离,也就是线程安全的。在实际的业务链路中从入口到具体的业务实现有时候须要共享某些通用信息,好比用户惟一标识、链路追踪惟一标识等,这些信息就可使用ThreadLocal来存储实现,下面就是一个简单的同一链路中共享traceId的示例代码。github
public class ThreadLocalUtil { private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>(); public static void setTraceId(String traceId) { TRACE_ID.set(traceId); } public static String getTraceId() { return TRACE_ID.get(); } public static void clearTraceId() { TRACE_ID.remove(); } }
在分布式环境中,每一个系统所依赖的外部服务不可避免的会出现失败或超时的状况,Hystrix 经过增长对依赖服务的延时容错及失败容错逻辑,也就是所谓的「熔断」,以帮助开发人员去灵活控制所依赖的分布式服务。编程
Hystrix经过隔离服务间的访问点,阻断服务间的级联故障,并提供降级选项,这一切都是为了提供系统总体的健壮性,在大规模分布式服务中,系统的健壮性尤为重要。Hystrix详细的介绍能够看:Hystrix介绍安全
当业务链路中的具体实现有依赖外部服务,且做了相关熔断保护,那么本文的两个主角就这么赶上了。网络
根据Hystrix的相关文档介绍咱们了解到,Hystrix提供两种线程隔离模式:信号量和线程池。多线程
信号量模式下执行业务逻辑时处于同一线程上下文,而线程池模式则使用Hystrix提供的线程池去执行相关业务逻辑。在平常业务开发中更多须要熔断的是涉及到外部网络IO调用的(如RPC调用),Hystrix存在的一个目的就是想减小外部依赖的调用对服务容器线程的消耗,信号量模式显然不太适合,所以咱们在绝大部分场景下使用的都是线程池模式,而Hystrix默认状况下启用的也是线程池模式。编程语言
本文想要解决的也正是在这种默认模式下才会有的问题:分布式
有人可能会想到是否是能够用InheritableThreadLocal去解决?ide
InheritableThreadLocal能够将当前线程中的线程变量信息共享到当前线程所建立的「子线程」中,但这边忽略了一个很重要的信息,Hystrix中的线程模式底层使用的是本身维护的一个线程池,也就是其中的线程会出现复用的状况,那么就会出现每一个线程所共享的信息都是以前首次获取到的「父线程」的共享信息,这显然不是咱们所期待的,因此InheritableThreadLocal被排除。
那么想要在Hystrix中解决这个问题怎么办?
优秀的Hystrix已经帮你们提供了相关解决方案,并且是插件化,按需定制。Hystrix的插件详细介绍请看这:Hystrix插件介绍,本文给你们介绍两种方案。
如何让ThreadLocal变量信息在HystrixCommand执行时能在Hystrix线程中正确的传递?
使用 HystrixConcurrencyStrategy插件能够来包装Hystrix线程所执行的方法,具体直接看示例代码:
public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { String traceId = ThreadLocalUtil.getTraceId(); return () -> { ThreadLocalUtil.setTraceId(traceId); try { return callable.call(); } finally { ThreadLocalUtil.clearTraceId(); } }; } } // 业务代码中某处合适的地方注册下当前的策略插件 HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());
使用这种方式很是简单,只要开发人员将本身关注的ThreadLocal值进行「复制」便可,那是否是使用这种方式就好了?
咱们留意到这种方式本质是针对HystrixCommand的run()方法(也就是加了@HystrixCommand注解的业务方法)拦截处理,但它可能会超时或失败,那么就会去执行fallback方法,若是在 fallback方法中也想共享相关上下文信息,这时就没法覆盖到这种场景了。
若是在你的业务中fallback不须要关注上下文信息这块的内容,那么上述这种方案就能够知足需求了,也很简单。但若是在fallback方法中也须要上下文信息,那么可使用Hystrix提供的下面这种插件方式。
使用HystrixCommandExecutionHook能够实现对Hystrix执行流程的彻底控制,你能够覆写它的一些关键节点的回调方法,以实现你的定制需求。想要更多的了解能够看下这:Command Execution Hook介绍,下面列举出HystrixCommandExecutionHook的一些经常使用的关键方法:
在了解上述这些关键方法后,能够发现实现也很简单,只要在onStart()的时候「复制」下关注的上下文信息,而后在onExecutionStart()和onFallbackStart()两个方法开始执行前「粘贴」下关注的上下文信息,最后在做相应的清理行为,就能够知足需求了,示例代码以下所示:
public class MyHystrixHook extends HystrixCommandExecutionHook { private String traceId; @Override public <T> void onStart(HystrixInvokable<T> commandInstance) { copyTraceId(); } @Override public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) { pasteTraceId(); } @Override public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) { pasteTraceId(); } // 下面option1和option2选择其中一种覆写就能够了 //------------------------------------option1------------------------------------ @Override public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) { ThreadLocalUtil.clearTraceId(); super.onExecutionSuccess(commandInstance); } @Override public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) { ThreadLocalUtil.clearTraceId(); return super.onExecutionError(commandInstance, e); } @Override public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) { ThreadLocalUtil.clearTraceId(); super.onFallbackSuccess(commandInstance); } @Override public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) { ThreadLocalUtil.clearTraceId(); return super.onFallbackError(commandInstance, e); } //------------------------------------option1------------------------------------ //------------------------------------option2------------------------------------ @Override public <T> void onSuccess(HystrixInvokable<T> commandInstance) { ThreadLocalUtil.clearTraceId(); super.onSuccess(commandInstance); } @Override public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) { ThreadLocalUtil.clearTraceId(); return super.onError(commandInstance, failureType, e); } //------------------------------------option2------------------------------------ private void copyTraceId() { this.traceId = ThreadLocalUtil.getTraceId(); } private void pasteTraceId() { ThreadLocalUtil.setTraceId(traceId); } } // 业务代码中某处合适的的地方注册下Hook插件 HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());
那是否是这样的实现方式就解决问题了?仔细想下会不会有什么问题?
咱们知道HystrixCommandExecutionHook插件注册后,全部HystrixCommand在被调用执行的时候都会通过这些覆写的方法,也就会出现多线程覆写traceId,那么对于这个Hook下的traceId随时可能被改变了。假设有这样场景:
为了解决上面遇到的问题,Hystrix为开发人员提供了经过HystrixRequestContext和HystrixRequestVariableDefault这两个关键类解决。
HystrixRequestContext用于记录每次Hystrix请求的上下文信息,其中有两个关键信息:static ThreadLocal<HystrixRequestContext> requestVariables: 用于记录每次HystrixCommand执行时的上下文。
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state:用于记录上下文真正的数据。
HystrixRequestVariableDefault的用法有点似于ThreadLocal,提供了get(),set()方法,具体能力的实现借助于HystrixRequestContext。
HystrixCommandExecutionHook插件终极解决方式的实现的示例代码以下:
public class MyHystrixHook extends HystrixCommandExecutionHook { private HystrixRequestVariableDefault<String> requestVariable = new HystrixRequestVariableDefault<>(); public <T> void onStart(HystrixInvokable<T> commandInstance) { HystrixRequestContext.initializeContext(); copyTraceId(); } @Override public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) { pasteTraceId(); } @Override public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) { pasteTraceId(); } @Override public <T> void onSuccess(HystrixInvokable<T> commandInstance) { HystrixRequestContext.getContextForCurrentThread().shutdown(); super.onSuccess(commandInstance); } @Override public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) { HystrixRequestContext.getContextForCurrentThread().shutdown(); return super.onError(commandInstance, failureType, e); } private void copyTraceId() { requestVariable.set(ThreadLocalUtil.getTraceId()); } private void pasteTraceId() { ThreadLocalUtil.setTraceId(requestVariable.get()); } }
在每次Hook执行onStart()方法的时候,须要先执行HystrixRequestContext的初始化操做,而后对关注的上下文信息进行「复制」,关键代码以下:
public void set(T value) { HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value)); }
把关注的信息复制到一个线程相关的ConcurrentHashMap中了,根据前面对HystrixCommandExecutionHook的介绍咱们知道,onStart()的时候当前线程为调用者线程;
在真正开始执行HystrixCommand业务方方法的时候,此时须要进行「粘贴」上下文信息,从requestVariable.get()获取,get操做关键代码以下:
public T get() { if (HystrixRequestContext.getContextForCurrentThread() == null) { throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used."); } ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state; // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap LazyInitializer<?> v = variableMap.get(this); if (v != null) { return (T) v.get(); } // 省略一部分 .... }
从代码能够看出get与set操做相对应,也是从线程相关的ConcurrentHashMap获取相应的值,从前序介绍咱们也得知当前线程是Hystrix提供的线程池线程,与调用者线程不是同一个线程,那么这个业务关注的上下文信息还能正确的传递到Hystrix线程中吗?通过测试它确实「神奇」的正确传递了,那究竟是怎么作到的呢?
原来是Hystrix「默默」的帮咱们作了,经过调试咱们看到以下一段关键代码:
this.actual = action; // 调用者线程HystrixRequestContext信息 this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() { @Override public Void call() throws Exception { HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { // 帮咱们作了一步拷贝操做 HystrixRequestContext.setContextOnCurrentThread(parentThreadState); // 开始真正的执行业务定义的方法,此时上下文信息已经一致了 actual.call(); return null; } finally { HystrixRequestContext.setContextOnCurrentThread(existingState); } } }); }
在执行业务定义的HystrixCommand方法前,Hystrix封装的对象帮咱们把调用者线程的上下文信息「拷贝」过来了,其实这个处理的思路有点相似于咱们前一个插件HystrixConcurrencyStrategy。
HystrixConcurrencyStrategy 和HystrixCommandExecutionHook二者插件方式你们能够根据实际状况去断定,若是肯定不须要在fallback中关注上下文传递信息,那用前者就能够了,也很简便,但若是你想解决的更完全点,那么用后一种方式就能够了。
做者:vivo 官网商城开发团队