今天咱们要介绍的是Reactor中的多线程模型和定时器模型,Reactor以前咱们已经介绍过了,它其实是观察者模式的延伸。java
因此从本质上来讲,Reactor是和多线程无关的。你能够把它用在多线程或者不用在多线程。react
今天将会给你们介绍一下如何在Reactor中使用多线程和定时器模型。git
先看一下以前举的Flux的建立的例子:github
Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println);
能够看到,不论是Flux generator仍是subscriber,他们实际上都是运行在同一个线程中的。多线程
若是咱们想让subscribe发生在一个新的线程中,咱们须要新启动一个线程,而后在线程内部进行subscribe操做。工具
Mono<String> mono = Mono.just("hello "); Thread t = new Thread(() -> mono .map(msg -> msg + "thread ") .subscribe(v -> System.out.println(v + Thread.currentThread().getName()) ) ); t.start(); t.join();
上面的例子中,Mono在主线程中建立,而subscribe发生在新启动的Thread中。线程
不少状况下,咱们的publisher是须要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。code
Scheduler是一个接口:教程
public interface Scheduler extends Disposable
它定义了一些定时器中必需要实现的方法:接口
好比当即执行的:
Disposable schedule(Runnable task);
延时执行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
和按期执行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Schedule有一个工具类叫作Schedules,它提供了多个建立Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其作为Supplier来建立Schedule。
简单点看Schedule就是对ExecutorService的封装。
Schedulers工具类提供了不少个有用的工具类,咱们来详细介绍一下:
Schedulers.immediate():
提交的Runnable将会立马在当前线程执行。
Schedulers.single():
使用同一个线程来执行全部的任务。
Schedulers.boundedElastic():
建立一个可重用的线程池,若是线程池中的线程在长时间内都没有被使用,那么将会被回收。boundedElastic会有一个最大的线程个数,通常来讲是CPU cores x 10。 若是目前没有可用的worker线程,提交的任务将会被放入队列等待。
Schedulers.parallel():
建立固定个数的工做线程,个数和CPU的核数相关。
Schedulers.fromExecutorService(ExecutorService):
从一个现有的线程池建立Scheduler。
Schedulers.newXXX:
Schedulers提供了不少new开头的方法,来建立各类各样的Scheduler。
咱们看一个Schedulers的具体应用,咱们能够指定特定的Scheduler来产生元素:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。
先讲一个结论,就是在链式调用中,publishOn能够切换Scheduler,可是subscribeOn并不会起做用。
这是由于真正的publish-subscribe关系只有在subscriber开始subscribe的时候才创建。
下面咱们来具体看一下这两个方法的使用状况:
publishOn能够在链式调用的过程当中,进行publish的切换:
@Test public void usePublishOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":"+ Thread.currentThread()) .publishOn(s) .map(i -> "value " + i+":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start(); System.out.println(Thread.currentThread()); Thread.sleep(5000); }
上面咱们建立了一个名字为parallel-scheduler的scheduler。
而后建立了一个Flux,Flux先作了一个map操做,而后切换执行上下文到parallel-scheduler,最后右执行了一次map操做。
最后,咱们采用一个新的线程来进行subscribe的输出。
先看下输出结果:
Thread[main,5,main] value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
能够看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。
那么在publishOn以前,map使用的线程就是ThreadA。 而在publishOn以后,map使用的线程就切换到了parallel-scheduler线程池。
subscribeOn是用来切换Subscriber的执行上下文,无论subscribeOn出如今调用链的哪一个部分,最终都会应用到整个调用链上。
咱们看一个例子:
@Test public void useSubscribeOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":" + Thread.currentThread()) .subscribeOn(s) .map(i -> "value " + i + ":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start(); Thread.sleep(5000); }
一样的,上面的例子中,咱们使用了两个map,而后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。
看下输出结果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
能够看到,无论哪一个map,都是用的是切换过的parallel-scheduler。
本文的例子learn-reactive
本文做者:flydean程序那些事本文连接:http://www.flydean.com/reactor-thread-scheduler/
本文来源:flydean的博客
欢迎关注个人公众号:「程序那些事」最通俗的解读,最深入的干货,最简洁的教程,众多你不知道的小技巧等你来发现!