本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范
本文 测试源码 | 实战源码前端
这一小节介绍如何经过定义相应的事件(onNext
、onError
和onComplete
) 建立一个 Flux 或 Mono。Reactor提供了generate
、create
、push
和handle
等方法,全部这些方法都使用 sink(池)来生成数据流。java
sink,顾名思义,就是池子,能够想象一下厨房水池的样子。以下图所示:node
下面介绍到的方法都有一个sink提供给方法使用者,一般至少会暴露三个方法给咱们,next
、error
和complete
。next和error至关于两个下水口,咱们不断将自定义的数据放到next口,Reactor就会帮咱们串成一个Publisher数据流,直到有一个错误数据放到error口,或按了一下complete
按钮,数据流就会终止了。react
generate
是一种同步地,逐个地发出数据的方法。由于它提供的sink是一个SynchronousSink
, 并且其next()
方法在每次回调的时候最多只能被调用一次。git
generate
方法有三种签名:github
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
1)使用SynchronousSink生成数据流web
@Test public void testGenerate1() { final AtomicInteger count = new AtomicInteger(1); // 1 Flux.generate(sink -> { sink.next(count.get() + " : " + new Date()); // 2 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count.getAndIncrement() >= 5) { sink.complete(); // 3 } }).subscribe(System.out::println); // 4 }
generate
方法,自定义数据已发完;输出结果为每1秒钟打印一下时间,共打印5次。docker
2)增长一个伴随状态数据库
对于上边的例子来讲,count
用于记录状态,当值达到5以后就中止计数。因为在lambda内部使用,所以必须是final类型的,且不能是原生类型(如int
)或不可变类型(如Integer
)。api
若是使用第二个方法签名,上边的例子能够这样改:
@Test public void testGenerate2() { Flux.generate( () -> 1, // 1 (count, sink) -> { // 2 sink.next(count + " : " + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count >= 5) { sink.complete(); } return count + 1; // 3 }).subscribe(System.out::println); }
BiFunction
,输入为状态和sink;3)完成后处理
第三个方法签名除了状态、sink外,还有一个Consumer
,这个Consumer
在数据流发完后执行。
Flux.generate( () -> 1, (count, sink) -> { sink.next(count + " : " + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count >= 5) { sink.complete(); } return count + 1; }, System.out::println) // 1 .subscribe(System.out::println); }
若是 state 使用了数据库链接或者其余须要进行清理的资源,这个 Consumer lambda 能够用来在最后完成资源清理任务。
create
是一个更高级的建立Flux的方法,其生成数据流的方式既能够是同步的,也能够是异步的,而且还能够每次发出多个元素。
create
用到了FluxSink
,后者一样提供 next,error 和 complete 等方法。 与generate不一样的是,create不须要状态值,另外一方面,它能够在回调中触发多个事件(即便事件是发生在将来的某个时间)。
create 经常使用的场景就是将现有的 API 转为响应式,好比监听器的异步方法。
先编写一个事件源:
public class MyEventSource { private List<MyEventListener> listeners; public MyEventSource() { this.listeners = new ArrayList<>(); } public void register(MyEventListener listener) { // 1 listeners.add(listener); } public void newEvent(MyEvent event) { for (MyEventListener listener : listeners) { listener.onNewEvent(event); // 2 } } public void eventStopped() { for (MyEventListener listener : listeners) { listener.onEventStopped(); // 3 } } @Data @NoArgsConstructor @AllArgsConstructor public static class MyEvent { // 4 private Date timeStemp; private String message; } }
准备一个监听器接口,它能够监听上边第2和3的两种事件:(1)新的MyEvent
到来;(2)事件源中止。以下:
public interface MyEventListener { void onNewEvent(MyEventSource.MyEvent event); void onEventStopped(); }
下面的测试方法逻辑是:建立一个监听器注册到事件源,这个监听器再收到事件回调的时候经过Flux.create
的sink将一系列事件转换成异步的事件流:
@Test public void testCreate() throws InterruptedException { MyEventSource eventSource = new MyEventSource(); // 1 Flux.create(sink -> { eventSource.register(new MyEventListener() { // 2 @Override public void onNewEvent(MyEventSource.MyEvent event) { sink.next(event); // 3 } @Override public void onEventStopped() { sink.complete(); // 4 } }); } ).subscribe(System.out::println); // 5 for (int i = 0; i < 20; i++) { // 6 Random random = new Random(); TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i)); } eventSource.eventStopped(); // 7 }
运行一下这个测试方法,20个MyEvent
陆续打印出来。
若是将上边的create
方法换成generate
方法,则会报出异常:
java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
证实generate
并不支持异步的方式。
create
方法还有一个变体方法push
,适合生成事件流。与 create相似,
push 也能够是异步地, 而且可以使用以上各类回压策略。因此上边的例子能够替换为push
方法。区别在于,push
方法中,调用next
、complete
或error
的必须是同一个线程。
除了next
、complete
或error
方法外,FluxSink
还有onRequest
方法,这个方法能够用来响应下游订阅者的请求事件。从而不只能够像上一个例子那样,上游在数据就绪的时候将其推送到下游,同时下游也能够从上游拉取已经就绪的数据。这是一种推送/拉取混合的模式。好比:
Flux<String> bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener<String>() { public void onMessage(List<String> messages) { for(String s : messages) { sink.next(s); // 1 } } }); sink.onRequest(n -> { // 2 List<String> messages = myMessageProcessor.request(n); // 3 for(String s : message) { sink.next(s); } }); ... }
Docker提供了一个用来监听事件的命令:docker events
,运行这个命令后,会监听docker daemon的事件并打印出来,执行是持续进行的,就像top
或前边介绍的mongostat
命令同样。Docker的java开发包的DockerClient
也提供了相应的API,这个API是基于回调的,所以咱们就可使用Reactor的create
方法,将这个基于回调的API转换为响应式流,流中的数据就是一个一个的docker事件。以下图所示:
1)测试DockerClient
首先,咱们先启动docker。
而后,咱们继续用第一章的webflux-demo
maven项目模块,在pom.xml
中添加Docker开发相关的依赖:
<!--docker client begin--> <dependency> <groupId>com.github.docker-java</groupId> <artifactId>docker-java</artifactId> <version>3.0.14</version> </dependency> <dependency> <groupId>javax.ws.rs</groupId> <artifactId>javax.ws.rs-api</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>org.glassfish.jersey.inject</groupId> <artifactId>jersey-hk2</artifactId> <version>2.26</version> </dependency> <!--docker client end-->
最后编写测试方法:
public class DockerEventTest { @Test public void dockerEventToFlux() throws InterruptedException { collectDockerEvents().subscribe(System.out::println); // 5 TimeUnit.MINUTES.sleep(1); // 6 } private Flux<Event> collectDockerEvents() { DockerClient docker = DockerClientBuilder.getInstance().build(); // 1 return Flux.create((FluxSink<Event> sink) -> { EventsResultCallback callback = new EventsResultCallback() { // 2 @Override public void onNext(Event event) { // 3 sink.next(event); } }; docker.eventsCmd().exec(callback); // 4 }); } }
tcp://localhost:2375
,2375是docker默认的端口号,能够经过指定的IP和端口链接docker daemon:DockerClientBuilder.getInstance("tcp://192.168.0.123:2375").build()
,不过要注意docker daemon监听接口和防火墙的配置。onNext
,这时候经过FluxSink
的next
方法将Event
对象发出。OK,看一下效果。
为了方便对比,咱们首先在终端运行docker events
命令,而后在另外一个终端进行docker操做,好比本例:
docker run -it -m 200M --memort-swap=200M progrium/stress --vm 1 --vm-bytes 300M
progrium/stress
是一个用于压力测试的容器,经过-m 200M
指定为该容器的运行最多分配200M内存,而后在压力测试的时候,经过--vm-bytes 300M
使其运行时尝试分配300M的内存,此时会出现内存不足(OOM)的错误并致使容器被杀死(single 9)。
如图所示,上方是分别运行两个命令的终端窗口,能够看到docker events
命令打印出了一系列事件,若是是第一个运行progrium/stress
应该回先有一个pull镜像的事件。下方是咱们的测试代码的输出,除了一些日志以外,能够看到这些事件也被输出了。
2)REST API推送到前端
下面,咱们更进一步将Event事件经过REST API推送到浏览器端,看过第1.3.3节的话,对这一起应该是轻车熟路了。
(一)首先定义一个咱们本身的DockerEvent
,这一步不是必须的哈,不过DockerClient
返回的Event
自己字段比较多,一般前端展现的话会转换为dvo,“戏要作足”嘛,哈哈。
DockerEvent.java
@Data @Document(collection = "docker-event") public class DockerEvent { @Indexed private String status; @Id private String id; private String from; private Node node; private EventType type; private String action; private String actorId; private Long time; private Long timeNano; }
(二)而后就是DAO层了,建立一个DockerEventMongoRepository
,增长三个@Tailable
的查询方法,分别用于查询所有、按照状态查询和按类型+名称查询(好比查询某某容器的事件):
DockerEventMongoRepository.java
public interface DockerEventMongoRepository extends ReactiveMongoRepository<DockerEvent, String> { @Tailable Flux<DockerEvent> findBy(); @Tailable Flux<DockerEvent> findByStatus(String status); @Tailable Flux<DockerEvent> findByTypeAndFrom(String type, String from); }
(三)定义一个CommandLineRunner
,用于在应用启动后即开始监听docker事件:
DockerEventsCollector.java
@Slf4j @Component public class DockerEventsCollector implements CommandLineRunner { private DockerEventMongoRepository dockerEventMongoRepository; private MongoTemplate mongo; // 1 public DockerEventsCollector(DockerEventMongoRepository dockerEventMongoRepository, MongoTemplate mongo) { // 1 this.dockerEventMongoRepository = dockerEventMongoRepository; this.mongo= mongo; } @Override public void run(String... args) { mongo.dropCollection(DockerEvent.class); // 2 mongo.createCollection(DockerEvent.class, CollectionOptions.empty().maxDocuments(200).size(100000).capped()); // 2 dockerEventMongoRepository.saveAll(collect()).subscribe(); // 6 } private Flux<DockerEvent> collect() { // 3 DockerClient docker = DockerClientBuilder.getInstance().build(); return Flux.create((FluxSink<Event> sink) -> { EventsResultCallback callback = new EventsResultCallback() { @Override public void onNext(Event event) { sink.next(event); } }; docker.eventsCmd().exec(callback); }) .map(this::trans) // 4 .doOnNext(e -> log.info(e.toString())); // 5 } private DockerEvent trans(Event event) { // 4 DockerEvent dockerEvent = new DockerEvent(); dockerEvent.setAction(event.getAction()); dockerEvent.setActorId(Objects.requireNonNull(event.getActor()).getId()); dockerEvent.setFrom(event.getFrom() == null ? null : event.getFrom().replace("//", "_")); dockerEvent.setId(UUID.randomUUID().toString()); dockerEvent.setNode(event.getNode()); dockerEvent.setStatus(event.getStatus()); dockerEvent.setTime(event.getTime()); dockerEvent.setTimeNano(event.getTimeNano()); dockerEvent.setType(event.getType()); return dockerEvent; } }
MongoTemplate
,Spring 4.3 以后,若是有构造方法,Spring会自动注入,不须要@Autowired
注解了。DockerEvent
建立“capped”的collection,方便测试,若是提早手动建立好的话能够不加这两句。若是在//1处使用的是响应式的ReactiveMongoTemplate
,由于是异步的,因此要用then()
或thenMany()
将后续的全部操做链接起来,如mongo.dropCollection(...).then(mongo.createCollection(...)).thenMany(dockerEventMongoRepository.saveAll(collect()))
,保证能前后依次执行。Event
转换为咱们定义的DockerEvent
,其中DockerEvent.from
字段是事件主体名称,好比容器名,可能有/
,所以进行一个字符替换,不然在URL中会有问题。DockerEvent
保存到MongoDB,用subscribe()
触发执行。(四)Service层没有啥逻辑,咱们直接写Controller:
DockerEventController.java
@Slf4j @RestController @RequestMapping(value = "/docker/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1 public class DockerEventController { private DockerEventMongoRepository dockerEventMongoRepository; public DockerEventController(DockerEventMongoRepository dockerEventMongoRepository) { this.dockerEventMongoRepository = dockerEventMongoRepository; } @GetMapping public Flux<DockerEvent> dockerEventStream() { // 2 return dockerEventMongoRepository.findBy(); } @GetMapping("/{type}/{from}") public Flux<DockerEvent> dockerEventStream(@PathVariable("type") String type, @PathVariable("from") String from) { // 3 return dockerEventMongoRepository.findByTypeAndFrom(type, from); } @GetMapping("/{status}") public Flux<DockerEvent> dockerEventStream(@PathVariable String status) { // 4 return dockerEventMongoRepository.findByStatus(status); } }
OK了,启动试一下:
能够看到,右侧的浏览器的小图标一直在旋转,表示持续接收推送中,当在终端中进行docker操做的时候,所产生的事件就马上出如今浏览器中了。若是请求/docker/events/oom
将只推送OOM事件,若是请求/docker/events/container/progrium_stress
将只推送来自容器progrium/stress的事件。
再次提醒,当capped 的 Collection中一条数据都没有的时候,
@Tailable
的API也会马上返回,因此须要等到数据库中有至少一条数据以后(好比先执行如下pull),再在浏览器中请求docker/events
API。