reactor官方文档译文(2)Reactor-core模块

You should never do your asynchronous work alone.
— Jon Brisbin
完成Reactor 1后写到
You should never do your asynchronous work alone.
— Stephane Maldini
完成Reactor 2后写到

名称解释:back pressure:背压。在交换机在阻止外来数据包发送到堵塞端口的时候可能会发生丢包。而背压就是考验交换机在这个时候避免丢包的能力。不少的交换机当发送或接收缓冲区溢出的时候经过将阻塞信号发送回源地址来实现背压。交换机在全双工时使用IEEE802.3x流控制达到一样目的。java

首先,咱们使用groovy示例来展现core模块的功能:react

//Initialize context and get default dispatcher
Environment.initialize()

//RingBufferDispatcher with 8192 slots by default
def dispatcher = Environment.sharedDispatcher()

//Create a callback
Consumer<Integer> c = { data ->
        println "some data arrived: $data"
}

//Create an error callback
Consumer<Throwable> errorHandler = { it.printStackTrace }

//Dispatch data asynchronously
r.dispatch(1234, c, errorHandler)

Environment.terminate()

下面,咱们使用Stream reactive实现来看:git

//standalone async processor
def processor = RingBufferProcessor.<Integer>create()

//send data, will be kept safe until a subscriber attaches to the processor
processor.onNext(1234)
processor.onNext(5678)

//consume integer data
processor.subscribe(new Subscriber<Integer>(){

  void onSubscribe(Subscription s){
    //unbounded subscriber
    s.request Long.MAX
  }

  void onNext(Integer data){
    println data
  }

  void onError(Throwable err){
    err.printStackTrace()
  }

  void onComplete(){
    println 'done!'
  }
}

//Shutdown internal thread and call complete
processor.onComplete()

Core模块概览github

Core Overview

Reactor core模块的子单元:redis

   Common IO和功能类型,一些是直接从java8 功能接口回迁的。

  Function、Supplier、consumer、Predicate、BiConsumer、BiFunction

  Tuples

  Resource、Pausable、Timer

  Buffer,Codec和一组预约义的Codec。

  Environment 上下文

  Dispatcher 协议和一组预约义的Dispatcher。

  预约义的Reactive Stream Processor

reactor-core能够用来逐渐替代另外的消息传递策略、调度时间任务或者以小的功能块组织代码。这种突破使开发者与其它Reactive基础库更好的合做,特别是对于没有耐心的开发者,没有了对RingBuffer的理解负担。spring

注意:Reactor-core隐藏了LMAX disruptor,所以不会出现也不会和现有的Disruptor依赖冲突。编程

功能模块json

 功能模块重用是核心,一般状况下在你使用Reactor时就须要的功能。所以,功能编程酷在哪里?其中一个核心理念是将可执行代码看作别的数据。另外一点,相似于Closure或者匿名函数,此时业务逻辑由最初的调用者决定。它一样避免了过量的If/SWITCH模块,而且这种分离是概念更清晰:每一个模块完成一个功能且不须要共享任何东西。安全

 组织功能模块服务器

  每一个功能组件都给出它的通常任务的明确意图:

Consumer:简单回调--一劳永逸的

BiCounsumer:两个参数的简单回调,一般用在序列比较,例如:前一个和下一个参数。

Function:转换逻辑--请求/应答

BiFunction:两个参数的转换,一般用在累加器,比较前一个和下一个参数,返回一个新的值。

Supplier:工厂逻辑--轮询

Predicate:测试路径--过滤

注意:咱们也将Publisher和Subscriber视做功能块,勇于称之为Reactive功能块。尽管如此,它们做为基础组件,普遍应用到Reactor及其其它地方。Stream API接收reactor.fn参数,为你建立合适的Subscriber。

好消息是在功能模块中包装可执行指令能够向砖块同样进行复用。

Consumer<String> consumer = new Consumer<String>(){
        @Override
        void accept(String value){
                System.out.println(value);
        }
};

//Now in Java 8 style for brievety
Function<Integer, String> transformation = integer -> ""+integer;

Supplier<Integer> supplier = () -> 123;

BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> {
        for(int i = 0; i < 10; i++){
                //lazy evaluate the final logic to run
                callback.accept(value);
        }
};

//note how the execution flows from supplier to biconsumer
biConsumer.accept(
        consumer,
        transformation.apply(
                supplier.get()
        )
);

 最初听起来,这可能不是一个引人注目的革命性变革。可是这种基本思惟模式的改变,将揭示咱们使异步代码变的稳健和可组合性的使命是多么难得。Dispatcher分发器将输入数据和错误回调分发给consumer来处理。Reactor Stream模块将更好的使用这些组件。

  当使用Ioc容器如spring时,一个好的开发者将利用Java的配置属性来返回一个无状态的功能bean。而后能够优美的注入到stream Pipeline或者分发他们的执行代码中的block中。

  元组

  你能够注意到这些接口,它们对输入参数和比较少的固定数量的参数的泛型有很好的支持。你怎么传递超过1个或者超过2个的参数呢?答案是使用元组Tuple,Tuple相似于csv中一个单独实例的同样,能够在在功能性编程中保证它们的类型安全和支持多个数量的参数。

  之前面的例子为例,咱们尝试提供两个参数的BiConsumer而使用单个参数的Consumer

Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> {
        for(int i = 0; i < 10; i++){
                //Correct typing, compiler happy
                tuple.getT1().accept(tuple.getT2());
        }
};

biConsumer.accept(
        Tuple.of(
                consumer,
                transformation.apply(supplier.get())
        )
);

注意:Tuple须要分配更多的空间,所以在比较或者键值信号等通常使用场景中更多直接使用Bi***组件。

Environment和Dispatcher

  功能性构建块已经准备就绪,让咱们使用它们来进行异步编程。第一步是到Dispatcher分区。

  在咱们启动任意Dispatcher前,须要保证能够有效的建立它们。一般,建立它们的代价比较高,缘由是须要预分配一个内存分区来保持分配的信号,这就是前言中介绍的著名的运行时分配和启动时预分配的不一样对比。所以提出了一个名为"Environment"共享上下文概念,使用它来管理这些不一样类型的Dispatcher,从而避免没必要要的建立开销。

  Environment

  reactor的使用者(或者可用的扩展库如@Spring)建立或者中止Environment。它们自动从META_INF/reactor/reactor-environment.properties处读取配置文件。

  注意,属性文件能够改变,经过在classpath下的META-INFO/reactor目录下一个新的属性配置能够改变属性文件。

       经过传递下面的环境变量reactor.profiles.active来在运行时段改变默认的配置文件。

java - jar reactor-app.jar -Dreactor.profiles.active=turbo

  启动和中止Environment

Environment env = Environment.initialize();

//Current registered environment is the same than the one initialized
Assert.isTrue(Environment.get() == env);

//Find a dispatcher named "shared"
Dispatcher d  = Environment.dispatcher("shared");

//get the Timer bound to this environment
Timer timer = Environment.timer();

//Shutdown registered Dispatchers and Timers that might run non-daemon threads
Environment.terminate();
//An option could be to register a shutdownHook to automatically invoke terminate.

注意:在一个给定的Jvm应用中,最好只维护一个Enviroment.在大多数状况下,使用Environment.initializeIfEmpty()就彻底ok。

Dispacher分发器

  从Reactor 1开始,Dispatcher就存在了。Dispatcher一般抽象消息传递的方法,和Java Executor有相似的通用约定。事实上Dispatcher继承自Executor。

  Dispatcher对有数据信号的传送方式及消费者同步或异步执行的错误信息有一套比较严格的类型限制约定。这种方式在面对经典的Executors时解决了第一个问题--错误隔离。效果以下:

错误消费者的调用不须要终端当前分配的资源。若是没有指定,它默认从当前存在的Environment中去寻找,并使用指定给它的errorJournalConsumer。

  异步Dispatche提供的第二个独特的特征是运行使用尾部递归策略来再次调度。尾部递归的应用场景是分发器发现Dispatcher的classLoader已经分配到正在运行的线程,这时,当当前消费者返回时将要执行的task放入到队列中。

  使用一个相似于 Groovy Spock test的异步的多线程分发器:

import reactor.core.dispatch.*

//...

given:
  def sameThread = new SynchronousDispatcher()
  def diffThread = new ThreadPoolExecutorDispatcher(1, 128)
  def currentThread = Thread.currentThread()
  Thread taskThread = null

  def consumer = { ev ->
    taskThread = Thread.currentThread()
  }

  def errorConsumer = { error ->
    error.printStackTrace()
  }

when: "a task is submitted"
  sameThread.dispatch('test', consumer, errorConsumer)

then: "the task thread should be the current thread"
  currentThread == taskThread

when: "a task is submitted to the thread pool dispatcher"
  def latch = new CountDownLatch(1)
  diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer)

  latch.await(5, TimeUnit.SECONDS) // Wait for task to execute

then: "the task thread should be different when the current thread"
  taskThread != currentThread

注意:

  如Java Executor同样,它们缺乏了咱们将加入到Reactor 2.x的一个特色:Reactive stream协议。这时在Reactor中仅有几个未完成事项中的一个未完成事项--没有将Reactive stream标准直接绑定到Reactor中。而后,你能够在Stream章节部分找到快速结合Reactor stream的方法。

表3 Dispatcher家族介绍

Dispatcher From Environment Description Strengths Weaknesses

RingBuffer

sharedDispatcher()

An LMAX DisruptorRingBuffer based Dispatcher.

Small latency peaks tolerated

Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware

Support ordering

'Spin' Loop when getting the next slot on full capcity

Single Threaded, no concurrent dispatch

Mpsc

sharedDispatcher() if Unsafe not available

Alternative optimized message-passing structure.

Latency peaks tolerated

5-10M+ dispatch/sec on commodity hardware

Support ordering

Unbounded and possibly using as much available heap memory as possible

Single Threaded, no concurrent dispatch

WorkQueue

workDispatcher()

An LMAX DisruptorRingBuffer based Dispatcher.

Latency Peak tolerated for a limited time

Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware

'Spin' Loop when getting the next slot on full capcity

Concurrent dispatch

Doesn’t support ordering

Synchronous

dispatcher("sync") or SynchronousDispatcher. INSTANCE

Runs on the current thread.

Upstream and Consumer executions are colocated

Useful for Test support

Support ordering if the reentrant dispatch is on the current thread

No Tail Recursion support

Blocking

TailRecurse

tailRecurse() or TailRecurse Dispatcher. INSTANCE

Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching.

Upstream and Consumer executions are colocated

Reduce execution stack, greatly expanded by functional call chains

Unbounded Tail Recurse depth

Blocking

Support ordering (Thread Stealing)

ThreadPoolExecutor

newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR)

Use underlying ThreadPoolExecutor message-passing

Multi-Threaded

Blocking Consumers, permanent latency tolerated

1-5M+ dispatch/sec on commodity hardware

Concurrent run on a given consumer executed twice or more

Unbounded by default

Doesn’t support ordering

Traceable Delegating

N/A

Decorate an existing dispatcher with TRACE level logs.

Dispatch tapping

Runs slower than the delegated dispatcher alone

Log overhead (runtime, disk)

Ring Buffer message passing

 

 

DispatcherSupplier

   你可能已经注意到了,一些Dispatcher事单线程的,特别是RingBufferDispatcher和MpsDispatcher。更进一步,根据Reactive Stream规范,Subscriber/Processor的实现是不容许并发通知的。这一点尤为对Reactor Streams产生了影响,使用Stream.dispachOn(Dispatcher)和一个Dispatcher来给并发信号的显示失败留后门。

  而后,有一个方法来避免这个缺点,使用Dispatcher池DispatcherSupplier。实际上,做为Supplier的工厂,Supplier.get()方法根据有趣的共享策略:轮询、最少使用。。等间接提供一个Dispatcher。

  Enviroment提供了一个静态方法去建立、并注册到当前活跃Environment的Dispatcher池:一组轮询的返回Dispatcher。一旦就绪,Supplier提供对Dispatcher数目的控制。

  不一样于通常的Dispatcher,Environment提供了一站式的管理服务:

Environment.initialize();
//....

//Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...)
DispatcherSupplier supplier = Environment.newCachedDispatchers(2);

Dispatcher d1 = supplier.get();
Dispatcher d2 = supplier.get();
Dispatcher d3 = supplier.get();
Dispatcher d4 = supplier.get();

Assert.isTrue( d1 == d3  && d2 == d4);
supplier.shutdown();

//Create and register a new pool of 3 dispatchers
DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool");
DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");

Assert.isTrue( supplier1 == supplier2 );
supplier1.shutdown();

Timer定时器

  Dispatcher尽量快的计算接收的任务,然而,Timer定时器提供一次性或者周期性的调度API。Reactor Core模块默认提供了一个HashWheelTimer定时器,它自动绑定到任意的新的Environment中。HashWheelTimer对处理大量的、并发的、内存调度任务有巨大的优点,它是替换java TaskScheduler的一个强大的选项。

  注意:它不是一个持久化的调度器,应用关闭时task将会丢失。下个正式版本Timer定时器将会有一些改变,例如使用redis增长持久化/共享,请关注。

   建立一个简单的定时器:

import reactor.fn.timer.Timer

//...

given: "a new timer"
    Environment.initializeIfEmpty()
    Timer timer = Environment.timer()
    def latch = new CountDownLatch(10)

when: "a task is submitted"
    timer.schedule(
            { Long now -> latch.countDown() } as Consumer<Long>,
            period,
            TimeUnit.MILLISECONDS
    )

then: "the latch was counted down"
    latch.await(1, TimeUnit.SECONDS)
    timer.cancel()
    Environment.terminate()

核心Processor

核心Processor用来作比Dispatcher更集中的job:支持背压计算异步task。

提供了org.reactivestreams.Processor接口的直接实现,所以能够很好的和别的Reactive Stream厂商一块儿工做。

记住:Processor便是Subscriber也是Publisher,所以你能够在想要的地方(source,processing,sink)将一个Processor插入到Reactive stream chain中。

注意:规范不推荐直接使用Processor.onNext(d)。

 

RingBuffer Processors

 基于RingBuffer的Reactive Stream Processor的优势以下:

  高吞吐量

  重启时不会丢掉没有消费的数据,且从最近的没有消费的数据开始执行

    若没有Subscriber监听,数据不会丢失(不想Reactor-stream的Broadcaster会丢掉数据)

    若在消息处理过程当中取消Subscriber,信号将会安全的从新执行,实际上它能在RingBufferProcessor上很好的工做。

  灵活的背压,它容许任意时间内有限数量的背压,Subscriber会消费掉而且请求更多的数据。

  传播的背压,由于它是一个Processor,它能够经过订阅方式传递消息。

  多线程的出/入Processor。

事实上,RingBuffer*Process相似于典型的MicroMessageBroker!

   它们的惟一缺点是它们在运行时建立它们会消耗大量的资源,缘由是它们不像它们的兄弟RingBufferDispatcher能够很容易的共享,这种特性使它们更适应于高吞吐量的预约义数据管道。

RingBufferProcessor

 Reactor的RingBufferProcessor组件本质上是Disruptor的RingBuffer,设计的目的是尽量的和原生的效率同样。使用场景是:你须要分发task到另一个线程,且该线程具备低耗、高吞吐量还在你的工做流中管理背压。

我使用RingBufferProcessor来计算远程异步调用的各类输出:AMQP, SSD存储和内存存储,Process彻底处理掉易变的延迟,每秒百万级别的消息的数据源历来没有阻塞过。
— 友好的Reactor使用者
RingBufferProcessor的使用场景

Ring Buffer message passing

 图7 在跟定时间T内,一个ringbufferprocessor,2个消费同一个sequence的Subscriber。

你可使用静态工具方法去建立一个ringbufferprocessor:

 

Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1
Stream<Integer> s = Streams.wrap(p); //2

s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5

input.subscribe(p); //5

1.建立一个Processor,让它具备32个slot的内部RingBuffer。

2. 从Reactive Streams Processor建立一个Reactor。

3. 每一个请求调用consume方法在本身的线程内建立一个Disruptor的EventProcessor。

4. 每一个请求调用consume方法在本身的线程内建立一个Disruptor的EventProcessor。

5. 每一个请求调用consume方法在本身的线程内建立一个Disruptor的EventProcessor。

6. 向一个Reactive Streams Publisher订阅这个Processor。

传递到Processor的Subscribe.onNext(Buffer)方法的每一个数据元素将广播给全部的消费者。这个Processor没有使用轮询分发,由于它在RingBufferWorkProcess中,RingBufferWorkProcess下面将要讨论。若传递一、二、3三个整数到Processor,能够看到控制台输出结果以下:

Thread[test-2,5,main] data=1
Thread[test-1,5,main] data=1
Thread[test-3,5,main] data=1
Thread[test-1,5,main] data=2
Thread[test-2,5,main] data=2
Thread[test-1,5,main] data=3
Thread[test-3,5,main] data=2
Thread[test-2,5,main] data=3
Thread[test-3,5,main] data=3

每一个线程接收到传给Process的全部数据,每一个线程顺序得到数据,由于内部使用RingBuffer管理

slot来发布数据。

RingBufferWorkProcessor

 不像标准的RingBufferProcessor只广播它的值给全部的消费者,RingBufferWorkProcessor基于消费者的多少来分发请求值。Processor接收信息,而后轮询发送到不一样的线程中(由于每一个消费者有本身独立的线程),然而使用内部RingBuffer来有效管理消息的发布。

咱们构造了一个可扩展的、多种htp微服务器请求负载均衡的RingBufferWorkProcessor.说它看起来快过光速多是我错了,另外gc的压力彻底可控。
使用RingBufferWorkProcessor的Reactor友好者

Ring Buffer message passing

使用RingBufferWorkProcessor很是简单,你只要改变上面示例代码的引用到静态的create方法建立。使用RingBufferWorkProcessor以下,其它的代码时同样的。

Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32);

建立一个具备32个slot的内部RingBuffer的Processor。

  如今,发布消息到Processor时,将不会广播给每个consumer,会根据消费者的数目分发给不一样的消费者。运行示例,结果以下:

Thread[test-2,5,main] data=3
Thread[test-3,5,main] data=2
Thread[test-1,5,main] data=1

  注意,RingBufferWorkProcessor会重复终端的信号、检测正在中止工做的Subscriber的取消异常,最终会被别的Subscriber执行一次。咱们保证适合事件至少发送一次。若你理解这个语义,你可能会当即说“等等,RingBufferWorkProcessor怎么做为一个消息代理工做啦?” 答案是确定的。

Codecs和Buffer

字节码操做对大量数据管道配置的应用是一个核心关注点。reactor-net普遍使用字节码操做来对接收的字节码进行编组和分组或者经过IO发送。

reactor.io.buffer.Buffer是java byteBuffer处理的一个装饰器,增长了一些列的操做。目的是经过使用ByteBuffer的limit和读取/覆盖预先分配的字节来减小字节的复制。追踪ByteBuffer的位置是开发人员口头的问题,Buffer简化了这些,咱们只须要关注这个简单的工具就能够了。

下面是一个简单的Buffer操做示例:

import reactor.io.buffer.Buffer

//...

given: "an empty Buffer and a full Buffer"
                def buff = new Buffer()
                def fullBuff = Buffer.wrap("Hello World!")

when: "a Buffer is appended"
                buff.append(fullBuff)

then: "the Buffer was added"
                buff.position() == 12
                buff.flip().asString() == "Hello World!"

Buffer的一个有用的应用是Buffer.View,多个操做例如split都会返回Buffer.View。它提供了一个无需拷贝的方式去扫描和检索ByteBuffer的字节码。Buffer.View一样也是一种Buffer。

使用一个分隔符和Buffer.view使块数据读取能够复用一样的字节码

byte delimiter = (byte) ';';
byte innerDelimiter = (byte) ',';

Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d");

List<Buffer.View> views = buffer.split(delimiter);

int viewCount = views.size();
Assert.isTrue(viewCount == 4);

for (Buffer.View view : views) {
    System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d"

    if(view.indexOf(innerDelimiter) != -1){
      for(Buffer.View innerView : view.split(innerDelimiter)){
        System.out.println(innerView.asString()); //prints "b-1" and "b-2"
      }
    }
}

使用Buffer应用到普通的分组和编组对开发者来讲可能显得不够高级,Reactor提供了一系列名称为Codec的预约义的转换器。一些Codec须要在classpath路径下添加一些额外的依赖,如json操做的Jackson依赖。

codec以两种方式工做:第一,继承Function去直接编码并返回编码好的数据,一般以Buffer的形式返回。这很是棒,但仅限于与无状态的Codec才能起效,另一个可选的方法是使用Codec.encoder来返回编码函数。

Codec.encoder()对比Codec.apply(Source)
Codec.encoder() 返回一个惟一的编码函数,这个编码函数不能被不一样线程共享。

Codec.apply(Source) 直接编码(并保存分配的编码器), 但Codec自己能够在线程间共享。 

对大部分实现了Buffer的codec来讲,Codec一样也能够根据source类型去解码数据。

解码数据源,须要使用Codec.decoder()获取解码函数。和编码不一样的是,没有为编码目的而重写的快捷方法。和编码相同的是,解码函数不能在线程间共享。

有两种形式的Code.decoder()函数,Codec.decoder()是一个阻塞的解码函数,它直接从传递源数据解码返回解码后的数据。Codec.decoder(Consumer)用做非阻塞的解码,它返回null,一旦解码只触发的Consumer,它能够和其它异步工具结合使用。

使用一个预约义的codec示例以下:

import reactor.io.json.JsonCodec

//...

given: 'A JSON codec'
                def codec = new JsonCodec<Map<String, Object>, Object>(Map);
    def latch = new CountDownLatch(1)

when: 'The decoder is passed some JSON'
                Map<String, Object> decoded;
                def callbackDecoder = codec.decoder{
                  decoded = it
                  latch.countDown()
                }
                def blockingDecoder = codec.decoder()

                //yes this is real simple async strategy, but that's not the point here :)
                Thread.start{
                  callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}"))
    }

    def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}")

then: 'The decoded maps have the expected entries'
    latch.await()
                decoded.size() == 1
                decoded['a'] == 'alpha'
                decodedMap['a'] == 'beta'

可用的核心Codec

名称 描述 须要的依赖

ByteArrayCodec

Wrap/unwrap byte arrays from/to Buffer.

N/A

DelimitedCodec

Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling.

N/A

FrameCodec

Split/Aggregate Buffer into Frame Buffers according to successive prefix lengths.

N/A

JavaSerializationCodec

Deserialize/Serialize Buffers using Java Serialization.

N/A

PassThroughCodec

Leave the Buffers untouched.

N/A

StringCodec

Convert String to/from Buffer

N/A

LengthFieldCodec

Find the length and decode/encode the appropriate number of bytes into/from Buffer

N/A

KryoCodec

Convert Buffer into Java objects using Kryo with Buffers

com.esotericsoftware.kryo:kryo

JsonCodec,JacksonJsonCodec

Convert Buffer into Java objects using Jackson with Buffers

com.fasterxml.jackson.core:jackson-databind

SnappyCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

org.xerial.snappy:snappy-java

GZipCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

N/A

 

参考文献:

1. http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa

2. http://projectreactor.io/docs/reference/

相关文章
相关标签/搜索