下面是一个经过实现接口 HystrixCommand
的一个Hello World 示例: html
public class HystrixHelloWorld extends HystrixCommand<String> { private final String name; public HystrixHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { return "Hello " + name + "!"; } }
点击查看详细源码 java
HystrixObservableCommand
等价于 HystrixCommand
react
一个等效的Hello World解决方案,使用 HystrixObservableCommand
代替 HystrixCommand
,经过覆盖 construct
方法,以下所示: git
public class HystrixObservableHelloWorld extends HystrixObservableCommand<String> { private final String name; public HystrixObservableHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
经过 execute()
方法同步调用 HystrixCommand
的实现,示例以下:github
String s = new HystrixHelloWorld("World").execute();
单元测试以下:异步
@Test public void testSynchronous() { //测试同步 assertEquals("Hello World!", new HystrixHelloWorld("World").execute()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").execute()); }
关于实现了HystrixObservableCommand
的方法示例以下:ide
关于HystrixObservableCommand
的实现没有简单的 execute
方法调用,若是清楚经过一个命令产生的 Observable
一定仅仅产生一个单一的值,则能够对 Observable
应用RXjava的操做 .toBlocking().toFuture().get()
模拟 execute
方法。单元测试
咱们能够经过使用 queue()
方法异步执行 HystrixCommand
,示例以下: 测试
Future<String> fWorld = new HystrixHelloWorld("World").queue();
咱们能够经过 Future获取到命令的结果集this
String fw=fWorld.get();
经过单元测试模拟操做以下:
@Test public void testAsynchronous1() throws Exception { //测试异步 assertEquals("Hello World!", new HystrixHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { //测试异步 Future<String> fWorld = new HystrixHelloWorld("World").queue(); Future<String> fBob = new HystrixHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的操做是等价的:
String s1 = new HystrixHelloWorld("World").execute(); String s2 = new HystrixHelloWorld("World").queue().get();
关于实现了HystrixObservableCommand
的方法示例以下:
HystrixObservableCommand
没有 queue
这种简单实现异步的方法 ,若是清楚经过一个命令产生的 Observable
一定仅仅产生一个单一的值,则能够对 Observable
应用RxJava操做 .toBlocking().toFuture()
模拟 queue
方法。
能够经过一下任意方法监听 HystrixCommand
的结果:
observe()
— 执行这个命令会返回一个热 Observable马上执行hystrix的命令 ,由于这个 Observable 经过 ReplaySubject
过滤,我们不会有丢失订阅以前的任何东西的危险。toObservable()
— 执行这个命令会返回一个“冷“ Observable,直到订阅 Observable 才会开始执行命令和发送结果 。Observable<String> fWorld = new HystrixHelloWorld("World").observe();
执行完上面的代码,咱们能够经过订阅 Observable 获取到它的值
fWorld.subscribe(new Action1<String>() { @Override public void call(String s) { // value emitted here } });
下面的单元测试示例以下:
@Test public void testObservable() throws Exception { Observable<String> fWorld = new HystrixHelloWorld("World").observe(); Observable<String> fBob = new HystrixHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlocking().single()); assertEquals("Hello Bob!", fBob.toBlocking().single()); fWorld.subscribe(new Observer<String>() { @Override public void onCompleted() { // 这里能够什么都不作 } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); fBob.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用java8的 lambda 表达式,示例以下:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
点击连接了解更多关于RXjava中 的 Observable