reactive streams与观察者模式

本文主要研究下java里头的reactive streams与观察者模式。html

reactive streams

reactive编程范式是一个异步编程范式,主要涉及数据流及变化的传播,能够看作是观察者设计模式的扩展。java

java里头的iterator是以pull模型,即订阅者使用next去拉取下一个数据;而reactive streams则是以push模型为主,订阅者调用subscribe方法订阅,发布者调用订阅者的onNext通知订阅者新消息。react

reactive streams java api

reactive streams定义了4个java api,以下spring

Processor<T,R>

processor既是Subscriber也是Publisher,表明两者的处理阶段编程

Publisher

publisher是数据的提供者, 将数据发布给订阅者设计模式

Subscriber

在调用Publisher.subscribe(Subscriber)以后,Subscriber.onSubscribe(Subscription)将会被调用api

Subscription

Subscription表明订阅者与发布者的一次订阅周期,一旦调用cancel去掉订阅,则发布者不会再推送消息。缓存

观察者模式

观察者模式的实现有推模型和拉模型bash

  • 拉模型

即发布者通知订阅有新消息,订阅者再去找发布者拉取异步

  • 推模型

即发布者通知订阅者有消息,通知的时候已经带上了一个新消息

reactor实例

maven

<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.1.2.RELEASE</version>
		</dependency>
复制代码

reactor 3 是java里头reactive streams的一个实现,基于reactive streams的java api,是spring 5反应式编程的基础。

Flux实例

@Test
    public void testBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
复制代码

小结

从上面的代码看,reactive streams其实是推拉结合的模式的结合。为何还要拉呢?

rabbitmq vs kafka

rabbitmq是以推为主的,若是消费者消费能力跟不上,则消息会堆积在内存队列中(必要时可能写磁盘)

kafka则是以拉为主的,生产者推送消息到broker,消费者本身根据本身的能力从broker拉取消息,因为消息是持久化的,所以无需关心生产消费速率的不平衡

backpressure

backpressure这个是为处理生产速率与消费速率不平衡这个问题而衍生出来的,订阅者能够在next方法里头根据本身的状况,使用request方法告诉发布者要取N个数据,发布者则向订阅者推送N个数据。经过request达到订阅者对发布者的反馈。而对于发布者而言,为了实现backpressure,则须要有一个缓存队列来缓冲订阅者没来得及消费的数据。涉及到缓冲,就涉及容量是有界仍是无界,若是是有界则在缓冲慢的时候,处理策略是怎样等等。

doc

相关文章
相关标签/搜索