RxJava 版本1.0 和 版本2.0的比较

如今RxJava已是2.0以上版本了,看到不少资料上还在讲1.0的版本,所以作个简单的笔记,供你们参考,不足之处请兄弟们留言我添加完善java

一、概述编程

RxJava(Reactive Extensions Java),RxJava基于观察者模式,是一种响应式编程模型,目的是提供统一的接口帮助开发者便捷的处理异步数据流。app

RxJava本质上是一个异步库,使用简洁的逻辑处理繁琐复杂任务。异步

 

二、观察者模式ide

观察者模式涉及到2主角函数

1)Observable :被观察者spa

2)Observer:观察者线程

被观察者中有一个观察者的列表,当被观察者发生变化的时候回根据这张表一一通知观察者。code

 

三、1.0和2.0的区别 orm

3.一、背压

1.0背压集中在Obserable中处理,致使有点混乱,。

2.0版本把对背压的处理提出来了,出现了两种观察者模式:

Observable(被观察者)/Observer(观察者) 不支持背压(Backpressure)
Flowable(被观察者)/Subscriber(观察者) 支持背压(Backpressure)

Flowable是新增的

3.二、操做符修改

1.0版本

Action这类接口命名Action0、Action1…(数字表明可接受的参数),如今作出了改动

1.x 2.x
Action0 Action
Action1 Consumer
Action2 BiConsumer
...  
ActionN  

Funx修改

1.x 2.x
Func

Function<T, R>

A functional interface that takes a value and returns another value

Func2 BiFunction<T1, T2, R>
Func3 ~ Func9

Function3 ~ Function9

 

Function3<T1, T2, T3, R>

Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R>

FuncN Function<T, R>

First()

1.x 2.x
first firstElement
first(Func1) filter(predicate).first()
firstOrDefault(T) first(T)
firstOrDefault(Func1, T) first(T)
   

 

3.三、线程调度 Schedulers

2.0去掉了Schedulers.immediate()、Schedulers.test()

 

3.4 、建立被观察者的变化

1.0版本 
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
        subscriber.onNext("hello");
        subscriber.onNext("hello");
        subscriber.onCompleted();
    }
});

2.0版本 
变化点1:create参数Observable.OnSubscribe改成ObservableOnSubscribe 
变化点2:回到函数call(Subscriber)改成 subscribe(ObservableEmitter) 
变化点3:可发射三种事件:emitter.onNext(T value)、onComplete、onError(Throwable e)
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

           @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
         
                e.onNext("hello!");
                e.onNext("world!");
                e.onNext("are you ok!");
                e.onError(new Throwable("error"));
                e.onComplete();
            }
        });

3.五、建立观察者的变化

1.0 中建立观察者Observer有两种方式

方式1:采用 Observer 接口

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            
        }

        @Override
        public void onCompleted() {
           
        }

        @Override
        public void onError(Throwable e) {
            
        }
    };

方式2:采用Subscriber接口建立, 其对 Observer接口进行了扩展,增长了onStart()、unSubscribe()

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            
        }

        @Override
        public void onCompleted() {
            
        }

        @Override
        public void onError(Throwable e) {
            
        }
    };

2.0 建立观察者Observer
变化点1:增长了回调方法onSubscribe(),其最早被调用,可用来作一些初始化的工做
变化点2:onCompleted()改为 onComplete()
    
Observer<Integer> observer = new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(Integer value) {
        }

        @Override
        public void onError(Throwable e) {
        }

        // 注意
        @Override
        public void onComplete() {
        }
    }

 

3.六、接口都增长了异常抛出

public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}


public interface BiConsumer<T1, T2> {

    /**
     * Performs an operation on the given values.
     * @param t1 the first value
     * @param t2 the second value
     * @throws Exception on error
     */
    void accept(T1 t1, T2 t2) throws Exception;
}

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(T t) throws Exception;
}


/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

 

3.7 from 变化

from -> fromArray

1.0版本

private void doFrom() {
       

        String[] items = {"item1", "item2", "item3"};
        Observable.from(items)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

    }

2.0版本
private void doFrom() {
        String[] items = {"item1", "item2", "item3"};
        Observable.fromArray(items).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

    }
相关文章
相关标签/搜索