Reactor中的Publisher
Reactor中有两种Publisher:Flux和Mono,其中Flux用来表示0N个元素的异步序列,Mono用来表示01个元素的异步序列,相对于Flux而言Mono更简单一些。java
建立Mono
reactor中的mono能够经过一些方法建立,经常使用方法以下:react
- just():能够指定序列中包含的所有元素。
- empty():建立一个不包含任何元素。
- error(Throwable error):建立一个只包含错误消息的序列。
- fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中建立 Mono。
- delay(Duration duration):建立一个 Mono 序列,在指定的延迟时间以后,产生数字 0 做为惟一值。
- ignoreElements(Publisher source):建立一个 Mono 序列,忽略做为源的 Publisher 中的全部元素,只产生结束消息。
- justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中建立 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Date; import java.util.concurrent.CompletableFuture; /** * @author: ffzs * @Date: 2020/8/5 下午3:57 */ @Slf4j public class MonoTest { @Test public void mono() { // 经过just直接赋值 Mono.just("my name is ffzs").subscribe(log::info); // empty 建立空mono Mono.empty().subscribe(); // 延迟生成0 Mono.delay(Duration.ofMillis(2)).map(String::valueOf).subscribe(log::info); // 经过Callable Mono.fromCallable(() -> "callback function").subscribe(log::info); // future Mono.fromFuture(CompletableFuture.completedFuture("from future")).subscribe(log::info); // 经过runnable Mono<Void> runnableMono = Mono.fromRunnable(() -> log.warn(Thread.currentThread().getName())); runnableMono.subscribe(); // 经过使用 Supplier Mono.fromSupplier(() -> new Date().toString()).subscribe(log::info); // flux中 Mono.from(Flux.just("from", "flux")).subscribe(log::info); // 只返回flux第一个 } }
下面是运行结果:
api
使用StepVerifier测试
- 经过
expectNext
执行相似断言的功能
@Test public void StepVerifier () { Mono<String> mono = Mono.just("ffzs").log(); StepVerifier.create(mono).expectNext("ff").verifyComplete(); }
若是next断言不符合实际状况,就会报错:
异步