在咱们的项目中,比较普遍地使用了ThreadLocal,好比,在filter层,根据token,取到用户信息后,就会放到一个ThreadLocal变量中;在后续的业务处理中,就会直接从当前线程,来获取该ThreadLocal变量,而后获取到其中的用户信息,很是的方便。java
可是,hystrix 这个组件一旦引入的话,若是使用线程隔离的方式,咱们的业务逻辑就被分红了两部分,以下:git
public class SimpleHystrixCommand extends HystrixCommand<String> { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { .... } ... }
首先,咱们定义了一个Command,这个Command,最终就会丢给hystrix的线程池中去运行。那,咱们的controller层,会怎么写呢?github
@RequestMapping("/") public String hystrixOrder () { SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 1 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); // 2 String res = simpleHystrixCommand.execute(); return res; }
因此,这中间,是有线程切换的,执行1时,当前线程里的ThreadLocal数据,在执行业务方法的时候,线程变了,也就取不到ThreadLocal数据了。缓存
若是没时间,能够直接看源码:app
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demodom
一开始,个人思路是,看看能不能把hystrix的默认线程池给换掉,由于构建HystrixCommand时,支持使用Setter的方式去配置。ide
以下:函数
com.netflix.hystrix.HystrixCommand.Setter final public static class Setter { // 1 protected final HystrixCommandGroupKey groupKey; // 2 protected HystrixCommandKey commandKey; // 3 protected HystrixThreadPoolKey threadPoolKey; // 4 protected HystrixCommandProperties.Setter commandPropertiesDefaults; // 5 protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; }
1处,设置命令组ui
2处,设置命令的keythis
3处,设置线程池的key;hystrix会根据这个key,在一个map中,来查找对应的线程池,若是找不到,则建立一个,并放到map中。
com.netflix.hystrix.HystrixThreadPool.Factory final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
4处,命令的相关属性,包括是否降级,是否熔断,是否容许请求合并,命令执行的最大超时时长,以及metric等实时统计信息
5处,线程池的相关属性,好比核心线程数,最大线程数,队列长度等
怎么样,能够设置的属性不少,是吧,可是,并无让咱们控制线程池的建立相关的,也没办法替换其默认线程池。
ok,那不用setter的方式,行不行呢?
HystrixCommand 的构造函数,看看能不能传入自定义的线程池呢?
通过我一开始不仔细的观察,发现有一个构造函数能够传入HystrixThreadPool,ok,就是它了。可是,后面仔细一看,居然是 package权限,个人子类,和HystrixCommand固然不是一个package下的,因此,访问不了这个构造器。
虽然,可使用反射,可是,我们仍是守规矩点好了,再看看有没有其余入口。
仔细观察下,看看线程池何时建立的?
入口在下图,每次new一个HystrixCommand,最终都会调用父类的构造函数:
上图所示处,initThreadPool里面,会去建立线程池,须要注意的是,这里的第一个实参,threadPool,是构造函数的第5个形参,目前来看,传进来的都是null。为啥说这个,咱们接着看:
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { //1 get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }
上面咱们说了,第一个实参,老是null,因此,会走这里的1处。
com.netflix.hystrix.HystrixThreadPool.Factory#getInstance static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { String key = threadPoolKey.name(); //1 this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } //2 if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { // 3 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
咱们接着看3处:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { // 1 this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); // 2 HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); // 3 this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); // 4 this.threadPool = this.metrics.getThreadPool(); ... }
1处,获取线程池的默认配置,这个就和咱们前面说的那个Setter里的相似
2处,从HystrixPlugins.getInstance()获取一个HystrixConcurrencyStrategy类型的对象,保存到局部变量 concurrencyStrategy
3处,初始化metrics,这里的第二个参数,是concurrencyStrategy.getThreadPool来获取的,这个操做,实际上就会去建立线程池。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); ... final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); ... // 1 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
上面的1处,会去建立线程池。可是,这里直接就是要了 jdk 的默认线程池类来建立,这还怎么搞?类型都定死了。无法扩展了。。。
可是,回过头来,又仔细看了看,这个getThreadPool 是 HystrixConcurrencyStrategy类的一个方法,这个方法也是个实例方法。
方法不能改,那,实例能换吗?再看看前面的代码:
ok,那接着分析:
public HystrixConcurrencyStrategy getConcurrencyStrategy() { if (concurrencyStrategy.get() == null) { //1 check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class); concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl); } return concurrencyStrategy.get(); }
1处,根据这个类,获取实现,感受有点戏。
private <T> T getPluginImplementation(Class<T> pluginClass) { // 1 T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; // 2 return findService(pluginClass, classLoader); }
1处,从一个动态属性中获取,后来经查,发现是若是集成了Netflix Archaius就能够动态获取属性,相似于一个配置中心
2处,若是前面没找到,就是要 JDK 的SPI机制。
private static <T> T findService( Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader<T> sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }
那就好说了。SPI ,咱们自定义一个实现,就能够替换掉默认的了,hystrix作的仍是不错,扩展性能够。
如今知道能够自定义HystrixConcurrencyStrategy了,那要怎么自定义呢?
这个类,是个抽象类,大致有以下几个方法:
getThreadPool getBlockingQueue(int maxQueueSize) Callable<T> wrapCallable(Callable<T> callable) getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)
说是抽象类,但其实并无须要咱们实现的方法,全部方法都有默认实现,咱们只须要重写须要覆盖的方法便可。
我这里,看重了第三个方法:
/** * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution. * <p> * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). * <p> * <b>Default Implementation</b> * <p> * Pass-thru that does no wrapping. * * @param callable * {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable<T>} either as a pass-thru or wrapping the one given */ public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
方法注释如上,我简单说下,在执行前,提供一个机会,让你去wrap这个callable,即最终要丢到线程池执行的那个callable。
咱们能够wrap一下原有的callable,在执行前,把当前线程的threadlocal变量存下来,即为A,而后设置到callable里面去;在callable执行的时候,就可使用咱们的A中的threadlocal来替换掉worker线程中的。
多说无益,这里直接看代码:
// 0 public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { /** * 1 获取当前线程的threadlocalmap */ Object currentThreadlocalMap = getCurrentThreadlocalMap(); Callable<T> finalCallable = new Callable<T>() { // 2 private Object callerThreadlocalMap = currentThreadlocalMap; // 3 private Callable<T> targetCallable = callable; @Override public T call() throws Exception { /** * 4 将工做线程的原有线程变量保存起来 */ Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap(); /** *5 将本线程的线程变量,设置为caller的线程变量 */ setCurrentThreadlocalMap(callerThreadlocalMap); try { // 6 return targetCallable.call(); }finally { // 7 setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread); log.info("restore work thread's threadlocal"); } } }; return finalCallable; }
获取线程的threadlocal的代码:
private Object getCurrentThreadlocalMap() { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); Object o = field.get(thread); return o; } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } return null; }
设置线程的threadlocal的代码:
private void setCurrentThreadlocalMap(Object newThreadLocalMap) { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); field.set(thread,newThreadLocalMap); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } }
https://github.com/Netflix/Hystrix/wiki/Plugins
@RequestMapping("/") public String hystrixOrder () { // 1 SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 2 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); String res = simpleHystrixCommand.execute(); return res; }
1处,设置ThreadLocal变量
public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() { UserVO userVO = new UserVO(); userVO.setUserName("test user"); RequestContextHolder.set(userVO); log.info("set thread local:{} to context",userVO); return userVO; }
2处,new了一个HystrixCommand,而后execute执行
public class SimpleHystrixCommand extends HystrixCommand<String> { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { // 1 String s = testService.getResult(); log.info("get thread local:{}",s); /** * 若是睡眠时间,超过2s,会降级 * {@link #getFallback()} */ int millis = new Random().nextInt(3000); log.info("will sleep {} millis",millis); Thread.sleep(millis); return s; }
重点看1处代码:
public String getResult() { UserVO userVO = RequestContextHolder.get(); log.info("I am hystrix pool thread,try to get threadlocal:{}",userVO); return userVO.toString(); }
如上所示,会去获取ThreadLocal变量,并打印。
在resources\META-INF\services目录下,建立文件:
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
内容为下面一行:
com.learn.hystrix.utils.MyHystrixConcurrencyStrategy
2020-05-09 17:26:11.134 INFO 7452 --- [nio-8080-exec-2] com.learn.hystrix.utils.SessionUtils : set thread local:UserVO(userName=test user) to context 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] com.learn.hystrix.service.TestService : I am hystrix pool thread,try to get threadlocal:UserVO(userName=test user) 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : get thread local:UserVO(userName=test user) 2020-05-09 17:26:11.144 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : will sleep 126 millis 2020-05-09 17:26:11.281 INFO 7452 --- [x-member-pool-2] c.l.h.u.MyHystrixConcurrencyStrategy : restore work thread's threadlocal
能够看到,已经发生了线程切换,在worker线程也取到了。
你们若是发现日志中出现了[ HystrixTimer-1] 线程的身影,不用担忧,那只是由于咱们的线程超时了,因此timer线程检测到了以后,去执行一个callable任务,那个runnable就是前面被咱们包装过的那个callable。(这块超时的机制,todo吧,下次再讲)
hystrix的插件机制,不止能够扩展上面这一个类,还有几个别的类也是能够的。你们直接参考:
https://github.com/Netflix/Hystrix/wiki/Plugins
代码demo,我放在了:
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo