认识Hystrixjava
Hystrix是Netflix开源的一款容错框架,包含经常使用的容错方法:线程隔离、信号量隔离、降级策略、熔断技术。
在高并发访问下,系统所依赖的服务的稳定性对系统的影响很是大,依赖有不少不可控的因素,好比网络链接变慢,资源忽然繁忙,暂时不可用,服务脱机等。咱们要构建稳定、可靠的分布式系统,就必需要有这样一套容错方法。
本文主要讨论线程隔离技术。编程
为何要作线程隔离缓存
好比咱们如今有3个业务调用分别是查询订单、查询商品、查询用户,且这三个业务请求都是依赖第三方服务-订单服务、商品服务、用户服务。三个服务均是经过RPC调用。当查询订单服务,假如线程阻塞了,这个时候后续有大量的查询订单请求过来,那么容器中的线程数量则会持续增长直致CPU资源耗尽到100%,整个服务对外不可用,集群环境下就是雪崩。以下图tomcat
订单服务不可用.png性能优化
:网络
整个tomcat容器不可用.png架构
Hystrix是如何经过线程池实现线程隔离的并发
Hystrix经过命令模式,将每一个类型的业务请求封装成对应的命令请求,好比查询订单->订单Command,查询商品->商品Command,查询用户->用户Command。每一个类型的Command对应一个线程池。建立好的线程池是被放入到ConcurrentHashMap中,好比查询订单:框架
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>(); threadPools.put(“hystrix-order”, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
当第二次查询订单请求过来的时候,则能够直接从Map中获取该线程池。具体流程以下图:异步
hystrix线程执行过程和异步化.png
建立线程池中的线程的方法,查看源代码以下:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { threadFactory = new ThreadFactory() { protected final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } }
执行Command的方式一共四种,具体区别以下:
execute():以同步堵塞方式执行run()。调用execute()后,hystrix先建立一个新线程运行run(),接着调用程序要在execute()调用处一直堵塞着,直到run()运行完成。
queue():以异步非堵塞方式执行run()。调用queue()就直接返回一个Future对象,同时hystrix建立一个新线程运行run(),调用程序经过Future.get()拿到run()的返回结果,而Future.get()是堵塞执行的。
observe():事件注册前执行run()/construct()。第一步是事件注册前,先调用observe()自动触发执行run()/construct()(若是继承的是HystrixCommand,hystrix将建立新线程非堵塞执行run();若是继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct()),第二步是从observe()返回后调用程序调用subscribe()完成事件注册,若是run()/construct()执行成功则触发onNext()和onCompleted(),若是执行异常则触发onError()。
toObservable():事件注册后执行run()/construct()。第一步是事件注册前,调用toObservable()就直接返回一个Observable<String>对象,第二步调用subscribe()完成事件注册后自动触发执行run()/construct()(若是继承的是HystrixCommand,hystrix将建立新线程非堵塞执行run(),调用程序没必要等待run();若是继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct(),调用程序等待construct()执行完才能继续往下走),若是run()/construct()执行成功则触发onNext()和onCompleted(),若是执行异常则触发onError()
注:
execute()和queue()是在HystrixCommand中,observe()和toObservable()是在HystrixObservableCommand 中。从底层实现来说,HystrixCommand其实也是利用Observable实现的(看Hystrix源码,能够发现里面大量使用了RxJava),尽管它只返回单个结果。HystrixCommand的queue方法其实是调用了toObservable().toBlocking().toFuture(),而execute方法其实是调用了queue().get()。
如何应用到实际代码中
package myHystrix.threadpool; import com.netflix.hystrix.*; import org.junit.Test; import java.util.List; import java.util.concurrent.Future; /** * Created by wangxindong on 2017/8/4. */ public class GetOrderCommand extends HystrixCommand<List> { OrderService orderService; public GetOrderCommand(String name){ super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(5000) ) .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter() .withMaxQueueSize(10) //配置队列大小 .withCoreSize(2) // 配置线程池里的线程数 ) ); } @Override protected List run() throws Exception { return orderService.getOrderList(); } public static class UnitTest { @Test public void testGetOrder(){ // new GetOrderCommand("hystrix-order").execute(); Future<List> future =new GetOrderCommand("hystrix-order").queue(); } } }
总结
执行依赖代码的线程与请求线程(好比Tomcat线程)分离,请求线程能够自由控制离开的时间,这也是咱们一般说的异步编程,Hystrix是结合RxJava来实现的异步编程。经过设置线程池大小来控制并发访问量,当线程饱和的时候能够拒绝服务,防止依赖问题扩散。
线程隔离.png
线程隔离的优势:
[1]:应用程序会被彻底保护起来,即便依赖的一个服务的线程池满了,也不会影响到应用程序的其余部分。
[2]:咱们给应用程序引入一个新的风险较低的客户端lib的时候,若是发生问题,也是在本lib中,并不会影响到其余内容,所以咱们能够大胆的引入新lib库。
[3]:当依赖的一个失败的服务恢复正常时,应用程序会当即恢复正常的性能。
[4]:若是咱们的应用程序一些参数配置错误了,线程池的运行情况将会很快显示出来,好比延迟、超时、拒绝等。同时能够经过动态属性实时执行来处理纠正错误的参数配置。
[5]:若是服务的性能有变化,从而须要调整,好比增长或者减小超时时间,更改重试次数,就能够经过线程池指标动态属性修改,并且不会影响到其余调用请求。
[6]:除了隔离优点外,hystrix拥有专门的线程池可提供内置的并发功能,使得能够在同步调用之上构建异步的外观模式,这样就能够很方便的作异步编程(Hystrix引入了Rxjava异步框架)。
线程隔离的缺点:
[1]:线程池的主要缺点就是它增长了计算的开销,每一个业务请求(被包装成命令)在执行的时候,会涉及到请求排队,调度和上下文切换。不过Netflix公司内部认为线程隔离开销足够小,不会产生重大的成本或性能的影响。
The Netflix API processes 10+ billion Hystrix Command executions per day using thread isolation. Each API instance has 40+ thread-pools with 5–20 threads in each (most are set to 10).
Netflix API天天使用线程隔离处理10亿次Hystrix Command执行。 每一个API实例都有40多个线程池,每一个线程池中有5-20个线程(大多数设置为10个)。
对于不依赖网络访问的服务,好比只依赖内存缓存这种状况下,就不适合用线程池隔离技术,而是采用信号量隔离,后面文章会介绍。
所以咱们能够放心使用Hystrix的线程隔离技术,来防止雪崩这种可怕的致命性线上故障。
在此我向你们推荐一个架构学习交流群。交流学习群号: 744642380, 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良