本文做为《Java编程方法论:响应式Rxjava与代码设计实战》一书第二章 Rxjava中的Subject一节的补充解读。java
首先来看一个Demo:react
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
System.out.println("test");
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
复制代码
获得的结果以下所示:编程
ForkJoinPool.commonPool-worker-3: 一郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 1
ForkJoinPool.commonPool-worker-3: 三郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 2
ForkJoinPool.commonPool-worker-3: 三郎神: 2
Emission completed
Emission completed
test
………………等待 2 秒后结束服务………………
调用 forkJoinPool.shutdownNow()结束服务...
还剩 0 个任务等待被执行,服务已关闭
复制代码
在调用subscribe1.dispose()
的时候,完成了订阅者自行解除订阅关系的约定,而假如后面调用的是replay.connect(consumer -> consumer.dispose())
,依然会在发送元素的过程当中强行中断,不带任何通知。而在使用publishSubject.onComplete()
后,则能够很优雅地通知后续订阅者优雅地结束。 如图2-3
所示,咱们按照图中文字操做,并在System.out.println("test")
这行打断点查看状态,发现其余2个订阅者并无被移除,为何会出现这种状况?学习
经过publishSubject.replay()
,咱们获得了一个ConnectableObservable
对象,具体以下:this
//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay() {
return ObservableReplay.createFrom(this);
}
复制代码
结合前面ConnectableObservable
相关知识的学习,在调用replay.subscribe(...)
时,会将下游的订阅者与DEFAULT_UNBOUNDED_FACTORY
所获得的UnboundedReplayBuffer
对象经过一个ReplayObserver
对象创建起联系:atom
//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source, final BufferSupplier<T> bufferFactory) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
//注意此处
ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
//此处这个curr会做为ObservableReplay下current字段的值,记住,它是个引用类型对象
return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
for (;;) {
ReplayObserver<T> r = curr.get();
if (r == null) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
//此时ObservableReplay中current字段的值所指对象也会发生改变
if (!curr.compareAndSet(null, u)) {
continue;
}
r = u;
}
InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
child.onSubscribe(inner);
//经过ReplayObserver的observers字段将下游订阅者管理起来
r.add(inner);
if (inner.isDisposed()) {
r.remove(inner);
return;
}
//此处UnboundedReplayBuffer对象与下游订阅者创建联系
r.buffer.replay(inner);
break;
}
}
}
复制代码
当调用replay.connect(consumer -> consumer.dispose())
时,经过current
获取上面获得的ReplayObserver
对象,并调用该对象的dispose()
方法(由replay.connect(...)
中传入的Consumer
实现可得),此时会将ObservableReplay
中的observers
字段设定为TERMINATED
,同时将ObservableReplay
自身身为AtomicReference
角色所存储值设定为DISPOSED
,即将ObservableReplay
中current
的值设定为了DISPOSED
。spa
//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
ReplayObserver<T> ps;
for (;;) {
ps = current.get();
if (ps == null || ps.isDisposed()) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
if (!current.compareAndSet(ps, u)) {
continue;
}
ps = u;
}
doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
break;
}
try {
connection.accept(ps);
} catch (Throwable ex) {
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(ps);
}
}
//ObservableReplay.ReplayObserver#dispose
public void dispose() {
observers.set(TERMINATED);
DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
复制代码
能够看到,ReplayObserver
只是解除了与下游订阅者的关系,但并无进一步对下游订阅者进行结束的操做,这样与UnboundedReplayBuffer
对象创建联系的订阅者,若是buffer中的元素还未消费完毕,会持续消费直至所存元素下发完毕,但要注意的是,该buffer中并未存放结束事件(即经过调用UnboundedReplayBuffer#complete
往该队列中存放NotificationLite.complete()
元素)。同时下游订阅者也并未调用dispose()
方法,因此下面所示源码中的output.isDisposed()
结果为false
。请注意下面所示源码中<1>
处的代码:设计
public void replay(InnerDisposable<T> output) {
if (output.getAndIncrement() != 0) {
return;
}
final Observer<? super T> child = output.child;
int missed = 1;
for (;;) {
if (output.isDisposed()) {
return;
}
int sourceIndex = size;
Integer destinationIndexObject = output.index();
int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0;
while (destinationIndex < sourceIndex) {
Object o = get(destinationIndex);
//此处很关键
if (NotificationLite.accept(o, child)) {//<1>
return;
}
if (output.isDisposed()) {
return;
}
destinationIndex++;
}
output.index = destinationIndex;
missed = output.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
if (o == COMPLETE) {
s.onComplete();
return true;
} else
if (o instanceof ErrorNotification) {
s.onError(((ErrorNotification)o).e);
return true;
}
s.onNext((T)o);
return false;
}
复制代码
若是调用了UnboundedReplayBuffer#complete
,那么在元素下发到最后时,就会出现o == COMPLETE
为true
,此时会调用下游订阅者的onComplete()
方法。3d
//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete() {
if (!done) {
done = true;
buffer.complete();
replayFinal();
}
}
//ObservableReplay.UnboundedReplayBuffer#complete
public void complete() {
add(NotificationLite.complete());
size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete() {
return COMPLETE;
}
复制代码
至此,关于replay_PublishSubject_test()
示例中所展示的疑点已经解读完毕。code