上次提到调用observable的publish和connect方法后能够将一个Observable发出的对象实时传递到订阅在上的subscriber。
这个和Rxjava中Subject的概念十分相像。Subject是能够理解为桥接或者代理,能够订阅Observable也能够本身做为Observable提供Subscriber订阅。
Rxjava提供PublishSubject其功能与上次提到的publish+connect同样html
@Test public void testPublishSubject() throws InterruptedException { //建立一个publish subject PublishSubject<Object> subject = PublishSubject.create(); Observable.create(sub->{ new Thread(()->{ int i=0; while(i<5){ sub.onNext(i++); try { Thread.sleep(500); } catch (InterruptedException e) { } } }).start(); }).subscribe(subject);//将subject订阅在原始Observable上 //在subject上订阅子subscriber subject.subscribe(x->{ System.out.println("1st sub "+x); }); Thread.sleep(1000); System.out.println("2nd subscriber start"); //一秒后在subject上订阅子subscriber subject.subscribe(x->{ System.out.println("2nd sub "+x); }); Thread.sleep(10000); } --------输出--------- 1st sub 1 1st sub 2 2nd subscriber start 1st sub 3 2nd sub 3 1st sub 4 2nd sub 4
subject的用法大体都如上所示,能够subscribe也能够被subscribe,subject的不一样是能够同时订阅多个observable并同时广播到子subscriber上。java
该subject会等到原observable结束并发出原observable最后一个发出的对象或错误。并发
该subject会发出最近的一个对象并持续发出接下来的所由对象。async
上面已经介绍过,该subject会发出订阅后原Observable所发出的对象。代理
该subject会返回从原始observable开始的全部对象。要注意的是这个可能存在潜在的OOM风险。code