摘要: 原创出处 www.iocoder.cn/Hystrix/com… 「芋道源码」欢迎转载,保留摘要,谢谢!html
本文主要基于 Hystrix 1.5.X 版本 java
🙂🙂🙂关注微信公众号:【芋道源码】有福利: git
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文主要分享 Hystrix 执行命令方法。github
建议 :对 RxJava 已经有必定的了解的基础上阅读本文。微信
在官方提供的示例中,咱们看到 CommandHelloWorld 经过继承 HystrixCommand 抽象类,有四种调用方式:架构
方法 | ||
---|---|---|
#execute() |
同步调用,返回直接结果 | |
#queue() |
异步调用,返回 java.util.concurrent.Future |
|
#observe() |
异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
|
#toObservable() |
未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
#testToObservable()
查看笔者补充的示例。推荐 Spring Cloud 书籍:异步
// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Observable<R> toObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ....
}
}
}
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
}
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// ... 包装 delegate
}
// ...
return f;
}
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
protected abstract R run() throws Exception;
}复制代码
#toObservable()
方法 :未作订阅,返回干净的 Observable 。这就是为何上文说“未调用” 。#observe()
方法 :调用 #toObservable()
方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject
发起订阅 。
#queue()
方法 :调用 #toObservable()
方法的基础上,调用:
Observable#toBlocking()
方法 :将 Observable 转换成阻塞的 rx.observables.BlockingObservable
。BlockingObservable#toFuture()
方法 :返回可得到 #run()
抽象方法执行结果的 Future 。
#run()
方法 :子类实现该方法,执行正常的业务逻辑。
#execute()
方法 :调用 #queue()
方法的基础上,调用 Future#get()
方法,同步返回 #run()
的执行结果。整理四种调用方式以下:ide
FROM 《【翻译】Hystrix文档-实现原理》
微服务![]()
本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable
的实现,因此你能够选择:源码分析
《RxJava 源码解析 —— BlockingObservable》
第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码仍是蛮痛苦的,主要是由于对 RxJava 不够熟悉。
胖友,分享一波朋友圈可好!