最近小主看到不少公众号都在发布Hystrix停更的文章,spring cloud体系的使用者和拥护者一片哀嚎,实际上,spring做为Java最大的家族,根本不须要担忧其中一两个零件的废弃,Hystrix的停更,只会催生更多或者更好的零件来替代它,所以,咱们须要作的是:**知道Hystrix是干吗,怎么用的,这样要找替代者就易于反掌了。java
文章提纲:redis
- 为何须要Hystrix?
- Hystrix如何解决依赖隔离
- 如何使用Hystrix
在大中型分布式系统中,一般系统不少依赖(HTTP,hession,Netty,Dubbo等),以下图:
在高并发访问下,这些依赖的稳定性与否对系统的影响很是大,可是依赖有不少不可控问题:如网络链接缓慢,资源繁忙,暂时不可用,服务脱机等.spring
以下图:QPS为50的依赖 I 出现不可用,可是其余依赖仍然可用.
当依赖I 阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性.以下图:
在复杂的分布式架构的应用程序有不少的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时若是没有隔离措施,当前应用服务就有被拖垮的风险。编程
例如:一个依赖30个SOA服务的系统,每一个服务99.99%可用。 99.99%的30次方 ≈ 99.7% 0.3% 意味着一亿次请求 会有 3,000,00次失败 换算成时间大约每个月有2个小时服务不稳定. 随着服务依赖数量的变多,服务不稳定的几率会成指数性提升.
解决问题方案:对依赖作隔离,Hystrix就是处理依赖隔离的框架,同时也是能够帮咱们作依赖服务的治理和监控.
Netflix 公司开发并成功使用Hystrix,使用规模以下:缓存
he Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation. Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).
- Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每一个命令在单独线程中/信号受权下执行。
- 可配置依赖调用超时时间,超时时间通常设为比99.5%平均时间略高便可.当调用超时时,直接返回或执行fallback逻辑。
- 为每一个依赖提供一个小的线程池(或信号),若是线程池已满调用将被当即拒绝,默认不采用排队.加速失败断定时间。
- 依赖调用结果分:成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。
- 提供熔断器组件,能够自动运行或手动调用,中止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行。
- 提供近实时依赖的统计和监控
Hystrix依赖的隔离架构,以下图:服务器
- 使用maven引入Hystrix依赖
<!-- 依赖版本 --> <hystrix.version>1.3.16</hystrix.version> <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>${hystrix.version}</version> </dependency> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-metrics-event-stream</artifactId> <version>${hystrix-metrics-event-stream.version}</version> </dependency> <!-- 仓库地址 --> <repository> <id>nexus</id> <name>local private nexus</name> <url>http://maven.oschina.net/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository>
- 使用命令模式封装依赖逻辑
public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { //最少配置:指定命令组名(CommandGroup) super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // 依赖逻辑封装在run()方法中 return "Hello " + name +" thread:" + Thread.currentThread().getName(); } //调用实例 public static void main(String[] args) throws Exception{ //每一个Command对象只能调用一次,不能够重复调用, //重复调用对应异常信息:This instance can only be executed once. Please instantiate a new instance. HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix"); //使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get(); String result = helloWorldCommand.execute(); System.out.println("result=" + result); helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix"); //异步调用,可自由控制获取结果时机, Future<String> future = helloWorldCommand.queue(); //get操做不能超过command定义的超时时间,默认:1秒 result = future.get(100, TimeUnit.MILLISECONDS); System.out.println("result=" + result); System.out.println("mainThread=" + Thread.currentThread().getName()); } } //运行结果: run()方法在不一样的线程下执行 // result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1 // result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2 // mainThread=main
note:异步调用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步调用使用command.execute() 等同于 command.queue().get();网络
- 注册异步事件回调执行
//注册观察者事件拦截 Observable<String> fs = new HelloWorldCommand("World").observe(); //注册结果回调事件 fs.subscribe(new Action1<String>() { @Override public void call(String result) { //执行结果处理,result 为HelloWorldCommand返回的结果 //用户对结果作二次处理. } }); //注册完整执行生命周期事件 fs.subscribe(new Observer<String>() { @Override public void onCompleted() { // onNext/onError完成以后最后回调 System.out.println("execute onCompleted"); } @Override public void onError(Throwable e) { // 当产生异常时回调 System.out.println("onError " + e.getMessage()); e.printStackTrace(); } @Override public void onNext(String v) { // 获取结果后回调 System.out.println("onNext: " + v); } }); /* 运行结果 call execute result=Hello observe-hystrix thread:hystrix-HelloWorldGroup-3 onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3 execute onCompleted */
- 使用Fallback() 提供降级策略
//重载HystrixCommand 的getFallback方法实现逻辑 public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) /* 配置依赖超时时间,500毫秒*/ .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500))); this.name = name; } @Override protected String getFallback() { return "exeucute Falled"; } @Override protected String run() throws Exception { //sleep 1 秒,调用会超时 TimeUnit.MILLISECONDS.sleep(1000); return "Hello " + name +" thread:" + Thread.currentThread().getName(); } public static void main(String[] args) throws Exception{ HelloWorldCommand command = new HelloWorldCommand("test-Fallback"); String result = command.execute(); } } /* 运行结果:getFallback() 调用运行 getFallback executed */
NOTE: 除了HystrixBadRequestException异常以外,全部从run()方法抛出的异常都算做失败,并触发降级getFallback()和断路器逻辑。架构
HystrixBadRequestException用在非法参数或非系统故障异常等不该触发回退逻辑的场景。
- 依赖命名:CommandKey
public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) /* HystrixCommandKey工厂定义依赖名称 */ .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
NOTE: 每一个CommandKey表明一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖作隔离.并发
- 依赖分组:CommandGroup
命令分组用于对依赖操做分组,便于统计,汇总等.app
//使用HystrixCommandGroupKey工厂定义 public HelloWorldCommand(String name) { Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) }
NOTE: CommandGroup是每一个命令最少配置的必选参数,在不指定ThreadPoolKey的状况下,字面值用于对不一样依赖的线程池/信号区分.
- 线程池/信号:ThreadPoolKey
public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) /* 使用HystrixThreadPoolKey工厂定义线程池名称*/ .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
NOTE: 当对同一业务依赖作隔离时使用CommandGroup作区分,可是对同一依赖的不一样远程调用如(一个是redis 一个是http),可使用HystrixThreadPoolKey作隔离区分.
最然在业务上都是相同的组,可是须要在资源上作隔离时,可使用HystrixThreadPoolKey区分.
- 请求缓存 Request-Cache
public class RequestCacheCommand extends HystrixCommand<String> { private final int id; public RequestCacheCommand( int id) { super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand")); this.id = id; } @Override protected String run() throws Exception { System.out.println(Thread.currentThread().getName() + " execute id=" + id); return "executed=" + id; } //重写getCacheKey方法,实现区分不一样请求的逻辑 @Override protected String getCacheKey() { return String.valueOf(id); } public static void main(String[] args){ HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { RequestCacheCommand command2a = new RequestCacheCommand(2); RequestCacheCommand command2b = new RequestCacheCommand(2); Assert.assertTrue(command2a.execute()); //isResponseFromCache断定是不是在缓存中获取结果 Assert.assertFalse(command2a.isResponseFromCache()); Assert.assertTrue(command2b.execute()); Assert.assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } context = HystrixRequestContext.initializeContext(); try { RequestCacheCommand command3b = new RequestCacheCommand(2); Assert.assertTrue(command3b.execute()); Assert.assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } } }
NOTE:请求缓存可让(CommandKey/CommandGroup)相同的状况下,直接共享结果,下降依赖调用次数,在高并发和CacheKey碰撞率高场景下能够提高性能.
Servlet容器中,能够直接实用Filter机制Hystrix请求上下文
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } } <filter> <display-name>HystrixRequestContextServletFilter</display-name> <filter-name>HystrixRequestContextServletFilter</filter-name> <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class> </filter> <filter-mapping> <filter-name>HystrixRequestContextServletFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
- 信号量隔离:SEMAPHORE
隔离本地代码或可快速返回远程调用(如memcached,redis)能够直接使用信号量隔离,下降线程隔离开销.
public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) /* 配置信号量隔离方式,默认采用线程池隔离 */ .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE))); this.name = name; } @Override protected String run() throws Exception { return "HystrixThread:" + Thread.currentThread().getName(); } public static void main(String[] args) throws Exception{ HelloWorldCommand command = new HelloWorldCommand("semaphore"); String result = command.execute(); System.out.println(result); System.out.println("MainThread:" + Thread.currentThread().getName()); } } /** 运行结果 HystrixThread:main MainThread:main */
- fallback降级逻辑命令嵌套
用场景:用于fallback逻辑涉及网络访问的状况,如缓存访问。
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> { private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteService.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand<String> { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // 使用不一样的线程池作隔离,防止上层线程池跑满,影响降级逻辑. .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { return null; } } }
NOTE:依赖调用和降级调用使用不一样的线程池作隔离,防止上层线程池跑满,影响二级降级逻辑调用.
- 显示调用fallback逻辑,用于特殊业务处理
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand<String> { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive 'primary' service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand<String> { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast 'secondary' service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
NOTE:显示调用降级适用于特殊需求的场景,fallback用于业务处理,fallback再也不承担降级职责,建议慎重使用,会形成监控统计换乱等问题.
- 命令调用合并:HystrixCollapser
命令调用合并容许多个请求合并到一个线程/信号下批量执行。
执行流程图以下:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { //建立返回command对象 return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0; for (CollapsedRequest<String, Integer> request : requests) { //手动匹配请求和响应 request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>> { private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List<String> run() { ArrayList<String> response = new ArrayList<String>(); for (CollapsedRequest<String, Integer> request : requests) { response.add("ValueForKey: " + request.getArgument()); } return response; } } public static class UnitTest { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<String> f1 = new CommandCollapserGetValueForKey(1).queue(); Future<String> f2 = new CommandCollapserGetValueForKey(2).queue(); Future<String> f3 = new CommandCollapserGetValueForKey(3).queue(); Future<String> f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0]; assertEquals("GetValueForKey", command.getCommandKey().name()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } } }
NOTE:使用场景:HystrixCollapser用于对多个相同业务的请求合并到一个线程甚至能够合并到一个链接中执行,下降线程交互次和IO数,但必须保证他们属于同一依赖.
以为本文对你有帮助?请分享给更多人
关注「编程无界」,提高装逼技能
![]()