RXJAVA之变换操做

RXJAVA提供了如下变换操做,对Observable的消息进行变换操做:app

1.windowide

按期未来自Observable的数据分拆成一些Observable窗口,而后发射这些窗口,而不是每次发射一项。函数

Observable<String> observable = Observable.just("123", "456","789","abc");spa

    observable.window(3).subscribeWith(new Observer<Observable<String>>(){server

@Overrideit

public void onComplete() {io

System.out.println("complete");class

}map

@Override方法

public void onError(Throwable arg0) {

System.out.println("error");

 

}

@Override

public void onNext(Observable<String> arg0) {

arg0.subscribeWith(new Observer<String>(){

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(String t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("error");

}

 

@Override

public void onComplete() {

System.out.println("complete");

}});

 

}

@Override

public void onSubscribe(Disposable arg0) {

System.out.println("onSubscribe");

 

}

});

    }

输出结果

onSubscribe

onSubscribe

123

456

789

complete

onSubscribe

abc

complete

complete

2.map

变换接收到的数据,从新发放出去。map函数只有一个参数,参数通常是Func1,Func1的<I,O>I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据。

Observable.just("123", "456","789").map(new Function<String,Integer>(){

@Override

public Integer apply(String t) throws Exception {

return Integer.parseInt(t);

}}).subscribeWith(new Observer<Integer>(){

 

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(Integer t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("onError");

}

 

@Override

public void onComplete() {

System.out.println("onComplete");

}});

    }

 输出结果

onSubscribe

123

456

789

onComplete

3.flatmap 

将Observable发射的数据变换为Observables集合,而后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。

Observable<String> observable = Observable.just("123", "456","789","abc");

    observable.flatMap(new Function<String,Observable<String>>(){

 

@Override

public Observable<String> apply(String t) throws Exception {

return Observable.just(t+"flatmap");

}}

    ).subscribeWith(new Observer<String>(){

 

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(String t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("onError");

}

 

@Override

public void onComplete() {

System.out.println("onComplete");

}});

    }

输出结果

onSubscribe

123flatmap

456flatmap

789flatmap

abcflatmap

onComplete