RxJava简析

rxjava文档地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 这个是中文版的
javascript

android studio 添加依赖  implementation 'io.reactivex.rxjava3:rxjava:3.0.4'php

首先,打印helloworld:css

public void hello(String args){ Flowable.fromArray(args).subscribe(s -> System.out.println("hello " + s + "!"));}

跟之前其余语言不大同样,看上去很麻烦,咱们一步步来看java

Flowable.fromArray(args)

这个方法最重要的就是里面的最后一句react

new FlowableFromArray<>(items)

果真FlowableFromArray是Flowable的子类,因此真正的实如今子类里面android

Flowable.fromArray(args).subscribe

 subscribe进到里面的是git

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) { Objects.requireNonNull(onNext, "onNext is null"); Objects.requireNonNull(onError, "onError is null"); Objects.requireNonNull(onComplete, "onComplete is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
return ls;}

看上去最重要的就是这两句了typescript

LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);

先进到subscribe(ls)中,发现这句数组

subscribeActual(flowableSubscriber)

跳进去发现是个抽象方法,那么实现确定在子类啦,进到子类FlowableFromArray微信

@Overridepublic void subscribeActual(Subscriber<? super T> s) { if (s instanceof ConditionalSubscriber) { s.onSubscribe(new ArrayConditionalSubscription<>( (ConditionalSubscriber<? super T>)s, array)); } else { s.onSubscribe(new ArraySubscription<>(s, array)); }}

跳进去又发现onSubscribe是个抽象方法,那么实现方法在哪呢,对啦,就是以前看到的LambdaSubscriber

public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); onError(ex); } }}

这个onSubscribe.accept(this)跳过去就是接口Consumer的accept方法了

因此一开始的helloworld代码也能够改为

FlowableFromArray flowableFromArray = new FlowableFromArray(new String[]{args});flowableFromArray.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("hello " + s + "!"); }});

是否是很麻烦,饶了一大圈,不要紧,咱们继续往下看

这里给出一些名词的翻译

  • Reactive 直译为反应性的,有活性的,根据上下文通常翻译为反应式、响应式

  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念

  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会经过某种方式通知观察者或订阅者

  • Observer 观察者对象,监听Observable发射的数据并作出响应,Subscriber是它的一个特殊实现

  • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一概译为发射

  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一概译为数据,数据项

下面是经常使用的操做符列表:

  1. 建立操做 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer

  2. 变换操做 Buffer, FlatMap, GroupBy, Map, Scan和Window

  3. 过滤操做 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast

  4. 组合操做 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip

  5. 错误处理 Catch和Retry

  6. 辅助操做 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using

  7. 条件和布尔操做 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile

  8. 算术和集合操做 Average, Concat, Count, Max, Min, Reduce, Sum

  9. 转换操做 To

  10. 链接操做 Connect, Publish, RefCount, Replay

  11. 反压操做,用于增长特殊的流程控制策略的操做符

下面咱们来看第一个操做符:Create

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });

咱们一块儿来看源码

首先是Observable的create方法

public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f));}

这里没什么,就是返回建立一个Observable对象,可是要注意里面的参数OnSubscribe

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity}
public interface Action1<T> extends Action { void call(T t);}

这个参数是一个接口,它的父类里有个抽象待实现的方法call,并且call方法被传了Subscriber进去

咱们来看Subscriber这个类,原来是个接口,并且它的父类Observer有三个很重要的方法

public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t);}

第一个create方法算是完成了,咱们能够拆分来看

Observable<Integer> integerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } }});

第二个方法subscribe,它的参数也是Subscriber,即intergerObservable.subscribe(Subscriber)

因此咱们就看出来了,Observable这个被观察者先是经过call增长一系列的监听,而后经过subscribe订阅监听。这样,当call里的内容开始执行后,触发监听回调

下面我要放大招了,我把源码简化了一下

public interface MyOnSubscribe { void call(MySubscriber subscriber);}
public interface MySubscriber { void onNext();
void onCompleted();
void onError();}
public class MyObservable {
MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe onSubscribe) { this.onSubscribe = onSubscribe; }
public final static MyObservable create(MyOnSubscribe onSubscribe) { return new MyObservable(onSubscribe); }
public final void subscribe(MySubscriber subscriber) { onSubscribe.call(subscriber); }}

测试代码

public void hello() { MyObservable.create(new MyOnSubscribe() { @Override public void call(MySubscriber subscriber) { try { for (int i = 0; i < 5; i++) { subscriber.onNext(); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(); } } }).subscribe(new MySubscriber() { @Override public void onNext() { System.out.println(1); }
@Override public void onCompleted() { System.out.println("onCompleted"); }
@Override public void onError() { System.out.println("onError"); } }); }

获得的结果是同样的。因此说,代码万变不离其中,只要灵活运用接口,接口就是用来监听的

第二个操做符from

Integer[] items = {0, 1, 2, 3, 4, 5};Observable.from(items).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

先看Observable的from方法

public final static <T> Observable<T> from(T[] array) { return from(Arrays.asList(array));}

其实就是把数组转成list,可是再点from进去就很重要

public final static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable));}public OnSubscribeFromIterable(Iterable<? extends T> iterable) { if (iterable == null) { throw new NullPointerException("iterable must not be null"); } this.is = iterable;}

OnSubscribeFromIterable是继承自OnSubscribe的,因此后面调的call方法,其实是调的OnSubscribeFromIterable里的call方法,咱们来看一下

@Overridepublic void call(final Subscriber<? super T> o) { final Iterator<? extends T> it = is.iterator(); if (!it.hasNext() && !o.isUnsubscribed()) o.onCompleted(); else  o.setProducer(new IterableProducer<T>(o, it));}

真相大白了,在这里作了迭代。还有一个操做符just,其实底层里面调的就是from,只不过还限制了参数个数,并且参数类型必须相同,感受用处不大

第三个操做符repeat

Observable.just(1, 2).repeat(4).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

repeat点进去是OnSubcribRedo.repeat,紧追着count这个参数,会看到一个RedoFinite类

public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> { private final long count;
public RedoFinite(long count) { this.count = count; }
@Override public Observable<?> call(Observable<? extends Notification<?>> ts) { return ts.map(new Func1<Notification<?>, Notification<?>>() {
int num=0; @Override public Notification<?> call(Notification<?> terminalNotification) { if(count == 0) { return terminalNotification; } num++; if(num <= count) { return Notification.createOnNext(num); } else { return terminalNotification; } } }).dematerialize(); }}

这里就看到了,有个num++和num<=count判断,就知道是怎么重复的了

第4个操做符Map和flapMap

这两个变换操做符可谓很是重要,常常用到,我写了4个例子,请仔细区别,就能够知道它们到底作了什么

Student student1 = new Student("stark", new Course[]{new Course("Chinese"), new Course("English")});Student student2 = new Student("adam", new Course[]{new Course("Math"), new Course("Physical")});
Student[] students = new Student[]{student1, student2};Observable.from(students).subscribe(new Action1<Student>() { @Override public void call(Student student) { System.out.println(student.getName()); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); }}).subscribe(new Action1<String>() { @Override public void call(String name) { System.out.println(name); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, Course[]>() { @Override public Course[] call(Student student) { return student.getCourses(); }}).subscribe(new Action1<Course[]>() { @Override public void call(Course[] courses) { System.out.println(courses[0].getName()); System.out.println(courses[1].getName()); }});
System.out.println("-------------");
Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); }}).subscribe(new Action1<Course>() { @Override public void call(Course course) { System.out.println(course.getName()); }});
输出:starkadam-------------starkadam-------------ChineseEnglishMathPhysical-------------ChineseEnglishMathPhysical

若是你仔细看代码,就会发现map就是一对一的转换,flatMap是一对多的转换,转换的先后类型在方法Func1中已经标的很清楚。例子:Func1(Student,String)就表明传参是Student,返回类型是String,具体的实如今call里面student.getName()

map和flatMap能够看做是将咱们常常用到的嵌套循环for(i){for(j)...}...给解耦了,看起来更清楚一些,中间能够插入更多的操做

源码里面的实现就是迭代,没什么好说

第5个操做符filter:

Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer<4; }}).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

先过滤再循环输出

第6个组合操做符and/then/when

implementation 'io.reactivex:rxjava-joins:0.22.0'
Observable<String> just1 = Observable.just("A", "B");Observable<Integer> just2 = Observable.just(1, 2, 3);Pattern2<String, Integer> pattern = JoinObservable.from(just1).and(just2);Plan0<String> plan = pattern.then(new Func2<String, Integer, String>() { @Override public String call(String s, Integer integer) { return s + integer; }});JoinObservable.when(plan).toObservable().subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); }});
输出:A1B2

第7个组合操做符merge:

Observable<Integer> odds = Observable.just(1, 3, 5);Observable<Integer> evens = Observable.just(2, 4, 6);Observable.merge(odds,evens).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});
输出:135246

第8个操做符doOnNext:

Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { if (integer > 1) { throw new RuntimeException("item exceeds maximum value"); } }}).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); }
@Override public void onError(Throwable e) { System.out.println("onError"); }
@Override public void onNext(Integer integer) { System.out.println("next:" + integer); }});
输出:next:1onError

第9个操做符SubscribeOn(Scheduler):即申明在哪一个调度器工做

第10个:android例子:

Observable.from(new String[]{"one", "two", "three", "four", "five"}) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });

大体了解了rxjava的使用和基本原理以后,在后续的使用中遇到不懂的再看文档https://mcxiaoke.gitbooks.io/rxdocs/content/,还有必定要看源码,而后本身亲自尝试,才能加深理解

欢迎关注个人微信公众号:安卓圈

本文分享自微信公众号 - 安卓圈(gh_df75572d44e4)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索