本文主要研究一下reactive streams的processorsjava
processors既是Publisher也是Subscriber。在project reactor中processor有诸多实现,他们的分类大体以下:react
DirectProcessor以及UnicastProcessor
)EmitterProcessor及ReplayProcessor
)TopicProcessor及WorkQueueProcessor
)它不支持backpressure特性,若是publisher发布了N个数据,若是其中一个subscriber请求数<N,则抛出IllegalStateException.缓存
@Test public void testDirectProcessor(){ DirectProcessor<Integer> directProcessor = DirectProcessor.create(); Flux<Integer> flux = directProcessor .filter(e -> e % 2 == 0) .map(e -> e +1); flux.subscribe(new Subscriber<Integer>() { private Subscription s; @Override public void onSubscribe(Subscription s) { this.s = s; // s.request(2); } @Override public void onNext(Integer integer) { LOGGER.info("subscribe:{}",integer); } @Override public void onError(Throwable t) { LOGGER.error(t.getMessage(),t); } @Override public void onComplete() { } }); IntStream.range(1,20) .forEach(e -> { directProcessor.onNext(e); }); directProcessor.onComplete(); directProcessor.blockLast(); }
输出以下数据结构
16:00:11.201 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 16:00:11.216 [main] ERROR com.example.demo.ProcessorTest - Can't deliver value due to lack of requests reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304) at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106) at com.example.demo.ProcessorTest.lambda$testDirectProcessor$5(ProcessorTest.java:82) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557) at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
@Test public void testUnicastProcessor() throws InterruptedException { UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get()); Flux<Integer> flux = unicastProcessor .map(e -> e) .doOnError(e -> { LOGGER.error(e.getMessage(),e); }); IntStream.rangeClosed(1,12) .forEach(e -> { LOGGER.info("emit:{}",e); unicastProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); LOGGER.info("begin to sleep 7 seconds"); TimeUnit.SECONDS.sleep(7); //UnicastProcessor allows only a single Subscriber flux.subscribe(e -> { LOGGER.info("flux subscriber:{}",e); }); unicastProcessor.onComplete(); TimeUnit.SECONDS.sleep(10); // unicastProcessor.blockLast(); //blockLast也是一个subscriber }
输出实例多线程
16:31:04.970 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 16:31:04.977 [main] INFO com.example.demo.ProcessorTest - emit:1 16:31:05.990 [main] INFO com.example.demo.ProcessorTest - emit:2 16:31:06.991 [main] INFO com.example.demo.ProcessorTest - emit:3 16:31:07.994 [main] INFO com.example.demo.ProcessorTest - emit:4 16:31:08.998 [main] INFO com.example.demo.ProcessorTest - emit:5 16:31:10.002 [main] INFO com.example.demo.ProcessorTest - emit:6 16:31:11.007 [main] INFO com.example.demo.ProcessorTest - emit:7 16:31:12.010 [main] INFO com.example.demo.ProcessorTest - emit:8 16:31:13.014 [main] INFO com.example.demo.ProcessorTest - emit:9 16:31:14.029 [main] INFO com.example.demo.ProcessorTest - emit:10 16:31:14.030 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 10 16:31:15.034 [main] INFO com.example.demo.ProcessorTest - emit:11 16:31:15.034 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 11 16:31:16.038 [main] INFO com.example.demo.ProcessorTest - emit:12 16:31:16.038 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 12 16:31:17.043 [main] INFO com.example.demo.ProcessorTest - begin to sleep 7 seconds 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:1 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:2 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:3 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:4 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:5 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:6 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:7 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:8 16:31:24.058 [main] ERROR com.example.demo.ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...) reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330) at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
@Test public void testEmitterProcessor() throws InterruptedException { int bufferSize = 3; //小于8的会被重置为8 FluxProcessor<Integer, Integer> processor = EmitterProcessor.create(bufferSize); Flux<Integer> flux1 = processor.map(e -> e); Flux<Integer> flux2 = processor.map(e -> e*10); IntStream.rangeClosed(1,8).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); //若是发布的未消费数据超过bufferSize,则会阻塞在这里 }); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream.rangeClosed(9,10).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); //这个是后面添加的订阅,只能消费以后发布的数据 flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); processor.onNext(11); processor.onNext(12); processor.onComplete(); processor.blockLast(); }
输出实例并发
17:27:01.008 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 17:27:01.044 [main] INFO com.example.demo.ProcessorTest - emit:1 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:2 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:3 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:4 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:5 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:6 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:7 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:8 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:6 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:7 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:8 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - emit:9 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:9 17:27:02.091 [main] INFO com.example.demo.ProcessorTest - emit:10 17:27:02.092 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:10 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:11 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:110 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:12 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:120
能够缓存经过sink产生的数据或者订阅publisher的数据,而后重放给后来的订阅者。有以下四种配置app
只缓存最后一个数据
缓存最后N个数据
对每一个数据打上时间戳标签,只缓存age在指定ttl内的数据
对每一个数据打上时间戳标签,只缓存age在指定ttl内的N个数据
实例异步
@Test public void testReplayProcessor() throws InterruptedException { ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3); Flux<Integer> flux1 = replayProcessor .map(e -> e); Flux<Integer> flux2 = replayProcessor .map(e -> e); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream.rangeClosed(1,5) .forEach(e -> { replayProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); LOGGER.info("finish publish data"); TimeUnit.SECONDS.sleep(3); LOGGER.info("begin to subscribe flux2"); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); replayProcessor.onComplete(); replayProcessor.blockLast(); }
输出以下async
15:13:39.415 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 15:13:39.438 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 15:13:40.445 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2 15:13:41.449 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3 15:13:42.454 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 15:13:43.459 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5 15:13:44.463 [main] INFO com.example.demo.ProcessorTest - finish publish data 15:13:47.466 [main] INFO com.example.demo.ProcessorTest - begin to subscribe flux2 15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:3 15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:4 15:13:47.468 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:5
@Test public void testTopicProcessor() throws InterruptedException { TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder() .share(true) // .executor(Executors.newSingleThreadExecutor()) .build(); Flux<Integer> flux1 = topicProcessor .map(e -> e); Flux<Integer> flux2 = topicProcessor .map(e -> e); Flux<Integer> flux3 = topicProcessor .map(e -> e); AtomicInteger count = new AtomicInteger(0); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); count.incrementAndGet(); }); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); flux3.subscribe(e -> { LOGGER.info("flux3 subscriber:{}",e); }); IntStream.rangeClosed(1,100) .parallel() .forEach(e -> { // LOGGER.info("emit:{}",e); topicProcessor.onNext(e); }); topicProcessor.onComplete(); topicProcessor.blockLast(); TimeUnit.SECONDS.sleep(10); System.out.println(count.get()); }
注意两个地方:ide
share背后设置的是EventLoopProcessor的multiproducers属性
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/EventLoopProcessor.java
EventLoopProcessor( int bufferSize, @Nullable ThreadFactory threadFactory, @Nullable ExecutorService executor, ExecutorService requestExecutor, boolean autoCancel, boolean multiproducers, Supplier<Slot<IN>> factory, WaitStrategy strategy) { if (!Queues.isPowerOfTwo(bufferSize)) { throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize); } if (bufferSize < 1){ throw new IllegalArgumentException("bufferSize must be strictly positive, " + "was: "+bufferSize); } this.autoCancel = autoCancel; contextClassLoader = new EventLoopContext(multiproducers); this.name = defaultName(threadFactory, getClass()); this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor"); if (executor == null) { this.executor = Executors.newCachedThreadPool(threadFactory); } else { this.executor = executor; } if (multiproducers) { this.ringBuffer = RingBuffer.createMultiProducer(factory, bufferSize, strategy, this); } else { this.ringBuffer = RingBuffer.createSingleProducer(factory, bufferSize, strategy, this); } }
若是share为true,则建立的是createMultiProducer.
具体的表象就是若是有多线程调用processor的onNext方法,而没有开启share的话,会有并发问题,即数据会丢失.好比上面的代码,若是注释掉share(true),则最后count的大小就不必定是100,而开启share为true就能保证最后count的大小是100若是设置executor(Executors.newSingleThreadExecutor()),则flux1,flux2,flux3的订阅者则是顺序执行,而不是并发的.
@Test public void testWorkQueueProcessor(){ WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create(); Flux<Integer> flux1 = workQueueProcessor .map(e -> e); Flux<Integer> flux2 = workQueueProcessor .map(e -> e); Flux<Integer> flux3 = workQueueProcessor .map(e -> e); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); flux3.subscribe(e -> { LOGGER.info("flux3 subscriber:{}",e); }); IntStream.range(1,20) .forEach(e -> { workQueueProcessor.onNext(e); }); workQueueProcessor.onComplete(); workQueueProcessor.blockLast(); }
输出实例
21:56:58.203 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:56:58.214 [main] DEBUG reactor.core.publisher.UnsafeSupport - Starting UnsafeSupport init in Java 1.8 21:56:58.215 [main] DEBUG reactor.core.publisher.UnsafeSupport - Unsafe is available 21:56:58.228 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 21:56:58.228 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:3 21:56:58.228 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:2 21:56:58.229 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 21:56:58.229 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:5 21:56:58.229 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:6 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:7 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:8 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:9 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:10 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:11 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:12 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:13 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:14 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:15 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:17 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:16 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:19 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18
能够看到WorkQueueProcessor的subscriber就相似kafka的同属于一个group的consumer,各自消费的消息总和就是publisher发布的总消息,不像TopicProcessor那种广播式的消息传递.