引子:被誉为“中国大数据第一人”的涂子沛先生在其成名做《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM的研究称,整我的类文明所得到的所有数据中,有90%是过去两年内产生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在内的大批新技术应运而生。其中以RxJava和Reactor为表明的响应式(Reactive)编程技术针对的就是经典的大数据4V定义(Volume,Variety,Velocity,Value)中的Velocity,即高并发问题,而在即将发布的Spring 5中,也引入了响应式编程的支持。在接下来的几周,我会围绕响应式编程分三期与你分享个人一些学习心得。本篇是第二篇,以Reactor框架为例介绍响应式编程的几个关键特性。java
前情概要:react
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipediagit
在上述响应式编程(后面简称RP)的定义中,除了异步编程,还包含两个重要的关键词:github
一个容易混淆的概念是响应式设计,虽然它的名字中也包含了“响应式”三个字,但其实和RP彻底是两码事。响应式设计是指网页可以自动调整布局和样式以适配不一样尺寸的屏幕,属于网站设计的范畴,而RP是一种关注系统可响应性,面向数据流的编程思想或者说编程框架。web
从本质上说,RP是一种异步编程框架,和其余框架相比,RP至少包含了如下三个特性:spring
subscribe()
方法以前,从发布端到订阅端,没有任何事会发生。就比如不管多长的水管,只要水龙头不打开,水管里的水就不会流动。为了提升描述能力,RP提供了比Stream丰富的多的多的API,好比buffer()
, merge()
, onErrorMap()
等。了解了RP的这些特性,你可能已经猜测到RP有哪些适用场景了。通常来讲,RP适用于高并发、带延迟操做的场景,好比如下这些状况(的组合):编程
Every coin has two sides.api
和任何框架同样,有优点必然就有劣势。RP的两个比较大的问题是:数组
flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe()
,一旦出错,你将很难定位到具体是哪一个环节出了问题。所幸的是,RP框架通常都会提供一些工具方法来辅助进行调试。为了帮助你理解上面说的一些概念,下面我就经过几个测试用例,演示RP的两个关键特性:提升吞吐量和背压。完整的代码可参见我GitHub上的示例工程。缓存
@Test
public void testImperative() throws InterruptedException {
_runInParallel(CONCURRENT_SIZE, () -> {
ImperativeRestaurantRepository.INSTANCE.insert(load);
});
}
private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
executorService.submit(task);
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@Test
public void testReactive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
for (int i = 0; i < CONCURRENT_SIZE; i++) {
ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
}, e -> latch.countDown(), latch::countDown);
}
latch.await();
}复制代码
用例解读:
在演示测试用例以前,先看两张图,帮助你更形象的理解什么是背压。
图片出处:Dataflow and simplified reactive programming
两张图乍一看没啥区别,但实际上是彻底两种不一样的背压策略。第一张图,发布速度(100/s)远大于订阅速度(1/s),但因为背压的关系,发布者严格按照订阅者的请求数量发送数据。第二张图,发布速度(1/s)小于订阅速度(100/s),当订阅者请求100个数据时,发布者会积满所需个数的数据再开始发送。能够看到,经过背压机制,发布者能够根据各个订阅者的能力动态调整发布速度。
@BeforeEach
public void beforeEach() {
// initialize publisher
AtomicInteger count = new AtomicInteger();
timerPublisher = Flux.create(s ->
new Timer().schedule(new TimerTask() {
@Override
public void run() {
s.next(count.getAndIncrement());
if (count.get() == 10) {
s.complete();
}
}
}, 100, 100)
);
}
@Test
public void testNormal() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
timerPublisher
.subscribe(r -> System.out.println("Continuous consuming " + r),
e -> latch.countDown(),
latch::countDown);
latch.await();
}
@Test
public void testBackpressure() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
timerSubscription.set(subscription);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("consuming " + value);
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
@Override
protected void hookOnError(Throwable throwable) {
latch.countDown();
}
};
timerPublisher.onBackpressureDrop().subscribe(subscriber);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
timerSubscription.get().request(1);
}
}, 100, 200);
latch.await();
}复制代码
用例解读:
经过上面的介绍,不难看出RP其实是一种内置了发布者订阅者模型的异步编程框架,包含了线程复用,背压等高级特性,特别适用于高并发、有延迟的场景。
以上就是我对响应式编程的一些简单介绍,欢迎你到个人留言板分享,和你们一块儿过过招。下一篇我将综合前两篇的内容,详解一个完整的Spring 5示例应用,敬请期待。