RxJava中的Observable,多Subscribers

多个订阅者的默认行为并不老是可取的。在本文中,咱们将介绍如何更改此行为并以适当的方式处理多个订阅者。html

但首先,让咱们来看看多个订阅者的默认行为。java

默认行为react

假设咱们有如下Observable:数据库

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));
 
        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}
复制代码

订阅者订阅后会当即发出两个元素。bash

在咱们的示例中,咱们有两个订阅者:服务器

LOGGER.info("Subscribing");
 
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
 
s1.unsubscribe();
s2.unsubscribe();
复制代码

想象一下,获取每一个元素是一项代价高昂的操做 - 例如,它可能包括密集计算或打开URL链接。ide

为了简单起见,咱们只返回一个数字:spa

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}
复制代码

这是输出:code

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources
复制代码

咱们能够看到,在默认状况下,获取每一个元素和清除资源都要执行两次-对于每一个订阅服务器一次。这不是咱们想要的。ConnectableObservable类有助于解决这个问题。htm

ConnectableObservable

ConnectableObservable类容许与多个订阅者共享订阅,而不容许屡次执行底层操做。

但首先,让咱们建立一个ConnectableObservable。

publish()

publish()方法是从Observable建立一个ConnectableObservable:

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();
复制代码

但就目前而言,它什么都不作。它的工做原理是connect()方法。

connect()

在调用ConnectableObservable的connect()方法以前,即便有一些订阅者,也不会触发Observable的onSubcribe()回调。

让咱们来证实一下:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();
复制代码

咱们订阅,而后等待一秒钟再链接输出是:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources
复制代码

咱们能够看到:

  • 获取元素只出现一次咱们想要的
  • 清算资源也只出现一次
  • 订阅后获取元素开始一秒钟
  • 订阅再也不触发元素的发射。只有connect()才能这样作

这种延迟多是有益的 - 有时咱们须要为全部订阅者提供相同的元素序列,即便其中一个订阅者比另外一个订阅者更早。

可观察的一致视图 - 在subscribe()以后的connect()

这个用例没法在咱们以前的Observable上进行演示,由于它运行很冷,并且两个订阅者均可以得到整个元素序列。

相反,想象一下,元素发射不依赖于订阅的时刻,例如,鼠标点击发出的事件。如今还想象第二个订阅者在第一个订阅者以后订阅第二个订阅者。

第一个订阅者将得到此示例中发出的全部元素,而第二个订阅者将只接收一些元素。

另外一方面,在正确的位置使用connect()方法能够为两个订阅者提供Observable序列上的相同视图。

让咱们建立一个Observable。它将在JFrame上点击鼠标时发出元素。

每一个元素都是点击的x坐标:

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}
复制代码

如今,若是咱们以第二个间隔一个接一个地订阅两个订阅者,运行程序并开始单击,咱们将看到第一个订阅者将得到更多元素:

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();
 
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
复制代码
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources
复制代码

connect() After subscribe()

为了使两个订阅者得到相同的序列,咱们将Observable转换为ConnectableObservable并在订阅者以后调用connect():

public static void subscribeBeforeConnect() throws InterruptedException {
 
    ConnectableObservable obs = getObservable().publish();
 
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}
复制代码

如今他们将获得相同的序列:

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources
复制代码

因此重点是等待全部用户准备就绪而后调用connect()。

在Spring应用程序中,咱们能够在应用程序启动期间订阅全部组件,例如在onApplicationEvent()中调用connect()。

让咱们回到咱们的例子;注意,connect()方法以前的全部单击操做都失败了。若是咱们不想遗漏元素,但相反,咱们能够在代码中更早地放置connect(),并强制可观察到的元素在没有任何订阅服务器的状况下生成事件。

在没有任何订阅者的状况下强制订阅 - connect()在subscribe()以前

为了证实这一点,让咱们更正咱们的例子:

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}
复制代码

步骤相对简单:

  • 首先,咱们链接
  • 而后咱们等待一秒钟并订阅第一个订阅者
  • 最后,咱们等待另外一秒钟并订阅第二个订阅者

请注意,咱们添加了doOnNext()运算符。这里咱们能够在数据库中存储元素,例如在咱们的代码中,咱们只打印“save...”。

若是咱们启动代码并开始点击,咱们将看到在connect()调用以后当即发出和处理元素:

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources
复制代码

若是没有订阅者,则仍会处理这些元素。

所以,不论是否有人订阅,connect()方法都会开始发出和处理元素,就好像有一个使用了元素的空操做的人工订阅器同样。

若是有一些真正的订阅者订阅,这我的工中介只向他们传播元素。

若要取消订阅,咱们会执行如下步骤:

s.unsubscribe();
复制代码

而后:

Subscription s = obs.connect();
复制代码

autoConnect()

此方法意味着在订阅以前或以后不会调用connect(),而是在第一个订阅者订阅时自动调用。

使用此方法,咱们不能本身调用connect(),由于返回的对象是一般的Observable,它没有此方法但使用底层的ConnectableObservable:

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();
 
    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
 
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}
复制代码

请注意,咱们也不能取消订阅人工订阅者。咱们能够取消订阅全部真正的订阅者,但人工订阅者仍将处理事件。

为了理解这一点,让咱们看一下最后一个订阅者取消订阅后最后发生的事情:

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268
复制代码

正如咱们所看到的,在第二次取消订阅后,不会出现清除资源的状况,并继续使用doOnNext()保存元素。这意味着人工订阅服务器不会取消订阅,而是继续使用元素。

refCount()

refCount()相似于autoConnect(),由于只要第一个订阅者订阅,链接也会自动发生。

与autoconnect()不一样,当最后一个订阅者取消订阅时,也会自动断开链接:

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();
 
    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
 
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
复制代码
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources
复制代码

结论

ConnectableObservable类能够轻松地处理多个订阅者。

它的方法看起来很类似,但因为实现上的细微差异(甚至方法的顺序也很重要),用户的行为发生了很大的变化。

相关文章
相关标签/搜索