教程:一块儿学习Hystrix--Hystrix命令执行

目录

  • “Hello World!”
  • 同步执行
  • 异步执行
  • 响应执行

“Hello World!”

    下面是一个经过实现接口 HystrixCommand的一个Hello World 示例:   html

public class HystrixHelloWorld extends HystrixCommand<String> {
    private final String name;

    public HystrixHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        return "Hello " + name + "!";
    }
}

   点击查看详细源码   java

HystrixObservableCommand 等价于 HystrixCommandreact

    一个等效的Hello World解决方案,使用 HystrixObservableCommand 代替 HystrixCommand ,经过覆盖 construct 方法,以下所示:   git

public class HystrixObservableHelloWorld extends HystrixObservableCommand<String> {
    private final String name;

    public HystrixObservableHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        // a real example would do work like a network call here
                        observer.onNext("Hello");
                        observer.onNext(name + "!");
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        } ).subscribeOn(Schedulers.io());
    }
}

同步执行

    经过 execute() 方法同步调用 HystrixCommand 的实现,示例以下:github

    

String s = new HystrixHelloWorld("World").execute();

单元测试以下:异步

@Test
        public void testSynchronous() { //测试同步
            assertEquals("Hello World!", new HystrixHelloWorld("World").execute());
            assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").execute());
        }

关于实现了HystrixObservableCommand 的方法示例以下:ide

    关于HystrixObservableCommand 的实现没有简单的 execute 方法调用,若是清楚经过一个命令产生的 Observable 一定仅仅产生一个单一的值,则能够对 Observable 应用RXjava的操做 .toBlocking().toFuture().get() 模拟 execute 方法。单元测试

异步执行

    咱们能够经过使用 queue() 方法异步执行 HystrixCommand ,示例以下: 测试

Future<String> fWorld = new HystrixHelloWorld("World").queue();

咱们能够经过 Future获取到命令的结果集this

String fw=fWorld.get();

经过单元测试模拟操做以下:

@Test
        public void testAsynchronous1() throws Exception { //测试异步
            assertEquals("Hello World!", new HystrixHelloWorld("World").queue().get());
            assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").queue().get());
        }

        @Test
        public void testAsynchronous2() throws Exception { //测试异步

            Future<String> fWorld = new HystrixHelloWorld("World").queue();
            Future<String> fBob = new HystrixHelloWorld("Bob").queue();

            assertEquals("Hello World!", fWorld.get());
            assertEquals("Hello Bob!", fBob.get());
        }

下面的操做是等价的:

String s1 = new HystrixHelloWorld("World").execute();
String s2 = new HystrixHelloWorld("World").queue().get();

    关于实现了HystrixObservableCommand 的方法示例以下:

   HystrixObservableCommand没有 queue 这种简单实现异步的方法 ,若是清楚经过一个命令产生的 Observable 一定仅仅产生一个单一的值,则能够对 Observable 应用RxJava操做 .toBlocking().toFuture() 模拟 queue 方法。

响应执行

    能够经过一下任意方法监听 HystrixCommand 的结果:

  • observe() — 执行这个命令会返回一个热 Observable马上执行hystrix的命令 ,由于这个 Observable 经过 ReplaySubject 过滤,我们不会有丢失订阅以前的任何东西的危险。
  • toObservable() — 执行这个命令会返回一个“冷“ Observable,直到订阅 Observable 才会开始执行命令和发送结果 。
Observable<String> fWorld = new HystrixHelloWorld("World").observe();

执行完上面的代码,咱们能够经过订阅 Observable 获取到它的值

fWorld.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    // value emitted here
                }
            });

下面的单元测试示例以下:

@Test
        public void testObservable() throws Exception {

            Observable<String> fWorld = new HystrixHelloWorld("World").observe();
            Observable<String> fBob = new HystrixHelloWorld("Bob").observe();

            // blocking
            assertEquals("Hello World!", fWorld.toBlocking().single());
            assertEquals("Hello Bob!", fBob.toBlocking().single());

            fWorld.subscribe(new Observer<String>() {

                @Override
                public void onCompleted() {
                    // 这里能够什么都不作
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onNext(String v) {
                    System.out.println("onNext: " + v);
                }

            });
            fBob.subscribe(new Action1<String>() {

                @Override
                public void call(String v) {
                    System.out.println("onNext: " + v);
                }

            });
        }

使用java8的 lambda 表达式,示例以下:

fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    })
    
    // - or while also including error handling
    
    fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    }, (exception) -> {
        exception.printStackTrace();
    })

点击连接了解更多关于RXjava中 的 Observable

相关文章
相关标签/搜索