哈哈哈哈哈,题目有点猖狂。可是既然你都来了,那就看看吧,毕竟响应式编程随着高并发对于性能的吃紧,愈来愈重要了。react
哦对了,这是一篇Java文章。编程
废话很少说,直接步入正题。安全
步入正题以前,我但愿你对发布者/订阅者模型有一些了解。多线程
直接看图:并发
Talk is cheap, show you the code!框架
public class Main { public static void main(String[] args) { Flux<Integer> flux = Flux.range(0, 10); flux.subscribe(i -> { System.out.println("run1: " + i); }); flux.subscribe(i -> { System.out.println("run2: " + i); }); } }
输出:dom
run1: 0 run1: 1 run1: 2 run1: 3 run1: 4 run1: 5 run1: 6 run1: 7 run1: 8 run1: 9 run2: 0 run2: 1 run2: 2 run2: 3 run2: 4 run2: 5 run2: 6 run2: 7 run2: 8 run2: 9 Process finished with exit code 0
Flux是一个多元素的生产者,言外之意,它能够生产多个元素,组成元素序列,供订阅者使用。异步
Mono和Flux的区别在于,它只能生产一个元素供生产者订阅,也就是数量的不一样。ide
Mono的一个常见的应用就是Mono<ServerResponse\>做为WebFlux的返回值。毕竟每次请求只有一个Response对象,因此Mono刚恰好。高并发
来看一些官方文档演示的方法。
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
public class FluxIntegerWithSubscribe { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); integerFlux.subscribe(i -> { System.out.println("run"); System.out.println(i); }, error -> { System.out.println("error"); }, () -> { System.out.println("done"); }, p -> { p.request(2); }); } }
若是去掉初次请求,那么会请求最大值:
public class FluxIntegerWithSubscribe { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); // 在这里说明一下subscribe()第四个参数,指出了当订阅信号到达,初次请求的个数,若是是null则所有请求(Long.MAX_VALUE) // 其他subscribe()详见源码或文档:https://projectreactor.io/docs/core/release/reference/#flux integerFlux.subscribe(i -> { System.out.println("run"); System.out.println(i); }, error -> { System.out.println("error"); }, () -> { System.out.println("done"); }); } }
输出:
run 0 run 1 run 2 run 3 run 4 run 5 run 6 run 7 run 8 run 9 done Process finished with exit code 0
public class FluxWithBaseSubscriber { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); integerFlux.subscribe(new MySubscriber()); } /** * 通常来讲,经过继承BaseSubscriber<T>来实现,并且通常自定义hookOnSubscribe()和hookOnNext()方法 */ private static class MySubscriber extends BaseSubscriber<Integer> { /** * 初次订阅时被调用 */ @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("开始啦!"); // 记得至少请求一次,不然不会执行hookOnNext()方法 request(1); } /** * 每次读取新值调用 */ @Override protected void hookOnNext(Integer value) { System.out.println("开始读取..."); System.out.println(value); // 指出下一次读取多少个 request(2); } @Override protected void hookOnComplete() { System.out.println("结束啦"); } } }
输出:
开始啦! 开始读取... 0 开始读取... 1 开始读取... 2 开始读取... 3 开始读取... 4 开始读取... 5 开始读取... 6 开始读取... 7 开始读取... 8 开始读取... 9 结束啦 Process finished with exit code 0
在这里使用多线程模拟生产者生产的很快,而后立马取消订阅(虽然马上取消可是因为生产者实在太快了,因此订阅者仍是接收到了一些元素)。
其余的方法,好比Disposables.composite()会获得一个Disposable的集合,调用它的dispose()方法会把集合里的全部Disposable的dispose()方法都调用。
public class FluxWithDisposable { public static void main(String[] args) { Disposable disposable = getDis(); // 每次打印数量通常不一样,由于调用了disposable的dispose()方法进行了取消,不过若是生产者产地太快了,那么可能来不及终止。 disposable.dispose(); } private static Disposable getDis() { class Add implements Runnable { private final FluxSink<Integer> fluxSink; public Add(FluxSink<Integer> fluxSink) { this.fluxSink = fluxSink; } @Override public synchronized void run() { fluxSink.next(new Random().nextInt()); } } Flux<Integer> integerFlux = Flux.create(integerFluxSink -> { Add add = new Add(integerFluxSink); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); }); return integerFlux.subscribe(System.out::println); } }
输出:
这里的输出每次调用可能都会不一样,由于订阅以后取消了,因此能打印多少取决于那一瞬间CPU的速度。
public class FluxWithLimitRate1 { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 100); integerFlux.subscribe(new MySubscriber()); } private static class MySubscriber extends BaseSubscriber<Integer> { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("开始啦!"); // 记得至少请求一次,不然不会执行hookOnNext()方法 request(1); } @Override protected void hookOnNext(Integer value) { System.out.println("开始读取..."); System.out.println(value); // 指出下一次读取多少个 request(2); } @Override protected void hookOnComplete() { System.out.println("结束啦!"); } } }
public class FluxWithLimitRate2 { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 100); // 最后,来看一些Flux提供的预获取方法: // 指出预取数量 integerFlux.limitRate(10); // lowTide指出预获取操做的补充优化的值,即修改75%的默认值;highTide指出预获取数量。 integerFlux.limitRate(10, 15); // 哎~最典型的就是,请求无数:request(Long.MAX_VALUE)可是我给你limitRate(2);那你也只能乖乖每次获得两个哈哈哈哈! // 还有一个就是limitRequest(N),它会把下流总请求限制为N。若是下流请求超过了N,那么只返回N个,不然返回实际数量。而后认为请求完成,向下流发送onComplete信号。 integerFlux.limitRequest(5).subscribe(new MySubscriber()); // 上面这个只会输出5个。 } }
如今到了程序化生成Flux/Mono的时候。首先介绍generate()方法,这是一个同步的方法。言外之意就是,它是线程不安全的,且它的接收器只能一次一个的接受输入来生成Flux/Mono。也就是说,它在任意时刻只能被调用一次且只接受一个输入。
或者这么说,它生成的元素序列的顺序,取决于代码编写的方式。
public class FluxWithGenerate { public static void main(String[] args) { // 下面这个是它的变种方法之一:第一个参数是提供初始状态的,第二个参数是一个向接收器写入数据的生成器,入参为state(通常为整数,用来记录状态),和接收器。 // 其余变种请看源码 Flux.generate(() -> 0, (state, sink) -> { sink.next(state+"asdf"); // 加上对于sink.complete()的调用便可终止生成;不然就是无限序列。 return state+1; }).subscribe(System.out::println); // generate方法的第三个参数用于结束生成时被调用,消耗state。 Flux.generate(AtomicInteger::new, (state, sink) -> { sink.next(state.getAndIncrement()+"qwer"); return state; }).subscribe(System.out::println); // generate()的工做流看起来就像:next()->next()->next()->... } }
说完了同步生成,接下来就是异步生成,仍是多线程的!让咱们有请:create()闪亮登场!!!
public class FluxWithCreate { public static void main(String[] args) throws InterruptedException { TestProcessor<String> testProcessor = new TestProcessor<>() { private TestListener<String> testListener; @Override public void register(TestListener<String> stringTestListener) { this.testListener = stringTestListener; } @Override public TestListener<String> get() { return testListener; } }; Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() { @Override public void onChunk(List<String> chunk) { for (String s : chunk) { stringFluxSink.next(s); } } @Override public void onComplete() { stringFluxSink.complete(); } })); flux.subscribe(System.out::println); System.out.println("如今是2020/10/22 22:58;我好困"); TestListener<String> testListener = testProcessor.get(); Runnable1<String> runnable1 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run1"); } testListener.onChunk(list); } }; Runnable1<String> runnable2 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run2"); } testListener.onChunk(list); } }; Runnable1<String> runnable3 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run3"); } testListener.onChunk(list); } }; runnable1.set(testListener); runnable2.set(testListener); runnable3.set(testListener); // create所谓的"异步","多线程"指的是在多线程中调用sink.next()方法。这一点在下面的push对比中能够看到 new Thread(runnable1).start(); new Thread(runnable2).start(); new Thread(runnable3).start(); Thread.sleep(1000); testListener.onComplete(); // 另外一方面,create的另外一个变体能够设置参数来实现负压控制,具体看源码。 } public interface TestListener<T> { void onChunk(List<T> chunk); void onComplete(); } public interface TestProcessor<T> { void register(TestListener<T> tTestListener); TestListener<T> get(); } public interface Runnable1<T> extends Runnable { void set(TestListener<T> testListener); } }
说完了异步多线程,同步的生成方法,接下来就是异步单线程:push()。
其实说到push和create的对比,我我的理解以下:
public class FluxWithPush { public static void main(String[] args) throws InterruptedException { TestProcessor<String> testProcessor = new TestProcessor<>() { private TestListener<String> testListener; @Override public void register(TestListener<String> testListener) { this.testListener = testListener; } @Override public TestListener<String> get() { return this.testListener; } }; Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() { @Override public void onChunk(List<String> list) { for (String s : list) { stringFluxSink.next(s); } } @Override public void onComplete() { stringFluxSink.complete(); } })); flux.subscribe(System.out::println); Runnable1<String> runnable = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++i) { list.add(UUID.randomUUID().toString()); } testListener.onChunk(list); } }; TestListener<String> testListener = testProcessor.get(); runnable.set(testListener); new Thread(runnable).start(); Thread.sleep(15); testListener.onComplete(); } public interface TestListener<T> { void onChunk(List<T> list); void onComplete(); } public interface TestProcessor<T> { void register(TestListener<T> testListener); TestListener<T> get(); } public interface Runnable1<T> extends Runnable { void set(TestListener<T> testListener); } }
同create同样,push也支持负压调节。可是我没写出来,我试过的Demo都是直接请求Long.MAX_VALUE,其实就是经过sink.onRequest(LongConsumer)方法调用来实现负压控制的。原理在这,想深究的请自行探索,鄙人不才,花费一下午没实现。
在Flux的实例方法里,handle相似filter和map的操做。
public class FluxWithHandle { public static void main(String[] args) { Flux<String> stringFlux = Flux.push(stringFluxSink -> { for (int i = 0; i < 10; ++ i) { stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5)); } }); // 获取全部包含'a'的串 Flux<String> flux = stringFlux.handle((str, sink) -> { String s = f(str); if (s != null) { sink.next(s); } }); flux.subscribe(System.out::println); } private static String f(String str) { return str.contains("a") ? str : null; } }
通常来讲,响应式框架都不支持并发,P.s. create那个是生产者并发,它自己不是并发的。因此也没有可用的并发库,须要开发者本身实现。
同时,每个操做通常都是在上一个操做所在的线程里运行,它们不会拥有本身的线程,而最顶的操做则是和subscribe()在同一个线程。好比Flux.create(...).handle(...).subscribe(...)都在主线程运行的。
在响应式框架里,Scheduler决定了操做在哪一个线程被怎么执行,它的做用相似于ExecutorService。不过功能稍微多点。若是你想实现一些并发操做,那么能够考虑使用Schedulers提供的静态方法,来看看有哪些可用的:
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { // Schedulers.immediate(): 直接在当前线程提交Runnable任务,并当即执行。 System.out.println("当前线程:" + Thread.currentThread().getName()); System.out.println("zxcv"); Schedulers.immediate().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("qwer"); }); System.out.println("asdf"); // 确保异步任务能够打印出来 Thread.sleep(1000); } }
经过上面看得出,immediate()其实就是在执行位置插入须要执行的Runnable来实现的。和直接把代码写在这里没什么区别。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { // 若是你想让每次调用都是一个新的线程的话,可使用Schedulers.newSingle(),它能够保证每次执行的操做都使用的是一个新的线程。 Schedulers.single().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("bnmp"); }); Schedulers.single().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("ghjk"); }); Schedulers.newSingle("线程1").schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("1234"); }); Schedulers.newSingle("线程1").schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("5678"); }); Schedulers.newSingle("线程2").schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("0100"); }); Thread.sleep(1000); } }
Schedulers.single(),它的做用是为当前操做开辟一个新的线程,可是记住,全部使用这个方法的操做都共用一个线程;
无界通常意味着不可管理,由于它可能会致使负压问题和过多的线程被建立。因此立刻就要提到它的替代方法。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { Schedulers.boundedElastic().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("1478"); }); Schedulers.boundedElastic().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("2589"); }); Schedulers.boundedElastic().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("0363"); }); Thread.sleep(1000); } }
Schedulers.boundedElastic()是一个更好的选择,由于它能够在须要的时候建立工做线程池,并复用空闲的池;同时,某些池若是空闲时间超过一个限定的数值就会被抛弃。
同时,它还有一个容量限制,通常10倍于CPU核心数,这是它后备线程池的最大容量。最多提交10万条任务,而后会被装进任务队列,等到有可用时再调度,若是是延时调度,那么延时开始时间是在有线程可用时才开始计算。
因而可知Schedulers.boundedElastic()对于阻塞的I/O操做是一个不错的选择,由于它可让每个操做都有本身的线程。可是记得,太多的线程会让系统备受压力。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { Schedulers.parallel().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("6541"); }); Schedulers.parallel().schedule(() -> { System.out.println("当前线程是:" + Thread.currentThread().getName()); System.out.println("9874"); }); Thread.sleep(1000); } }
最后,Schedulers.parallel()提供了并行的能力,它会建立数量等于CPU核心数的线程来实现这一功能。
顺带一提,还能够经过ExecutorService建立新的Scheduler。固然,Schedulers的一堆newXXX方法也能够。
有一点很重要,就是boundedElastic()方法能够适用于传统阻塞式代码,可是single()和parallel()都不行,若是你非要这么作那就会抛异常。自定义Schedulers能够经过设置ThreadFactory属性来设置接收的线程是不是被NonBlocking接口修饰的Thread实例。
Flux的某些方法会使用默认的Scheduler,好比Flux.interval()方法就默认使用Schedulers.parallel()方法,固然能够经过设置Scheduler来更改这种默认。
在响应式链中,有两种方式能够切换执行上下文,分别是publishOn()和subscribeOn()方法,前者在流式链中的位置很重要。在Reactor中,能够以任意形式添加任意数量的订阅者来知足你的需求,可是,只有在设置了订阅方法后,才能激活这条订阅链上的所有对象。只有这样,请求才会上溯到发布者,进而产生源序列。
publishOn()就和普通操做同样,添加在操做链的中间,它会影响在它下面的全部操做的执行上下文。看个例子:
public class FluxWithPublishOnSubscribeOn { public static void main(String[] args) throws InterruptedException { // 建立一个并行线程 Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) // map确定是跑在T上的。 .map(i -> 10 + i) // 此时的执行上下文被切换到了并行线程 .publishOn(s) // 这个map仍是跑在并行线程上的,由于publishOn()的后面的操做都被切换到了另外一个执行上下文中。 .map(i -> "value " + i); // 假设这个new出来的线程名为T new Thread(() -> flux.subscribe(System.out::println)); Thread.sleep(1000); } }
public class FluxWithPublishOnSubscribeOn { public static void main(String[] args) throws InterruptedException { // 依旧是建立一个并行线程 Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> fluxflux = Flux .range(1, 2) // 不过这里的map就已经在ss里跑了 .map(i -> 10 + i) // 这里切换,可是切换的是整个链 .subscribeOn(s) // 这里的map也运行在ss上 .map(i -> "value " + i); // 这是一个匿名线程TT new Thread(() -> fluxflux.subscribe(System.out::println)); Thread.sleep(1000); } }
subscribeOn()方法会把订阅以后的整个订阅链都切换到新的执行上下文中。不管在subscribeOn()哪里,均可以把最前面的订阅以后的订阅序列进行切换,固然了,若是后面还有publishOn(),publishOn()会进行新的切换。