1. 为何要写这篇文章数据库
几年前 NoSQL 开始流行的时候,像其余团队同样,咱们的团队也热衷于使人兴奋的新东西,而且计划替换一个应用程序的数据库。 可是,当深刻实现细节时,咱们想起了一位智者曾经说过的话:“细节决定成败”。最终咱们意识到 NoSQL 不是解决全部问题的银弹,而 NoSQL vs RDMS 的答案是:“视状况而定”。 编程
相似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了不再犯一样的错误,咱们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差别,以及如何肯定各自框架的正确用法。数组
本文中用到的术语在这里有更详细的描述。服务器
2. 分析并发框架的示例用例数据结构
3. 快速更新线程配置多线程
在开始比较并发框架的以前,让咱们快速复习一下如何配置最佳线程数以提升并行任务的性能。 这个理论适用于全部框架,而且在全部框架中使用相同的线程配置来度量性能。架构
对于内存任务,线程的数量大约等于具备最佳性能的内核的数量,尽管它能够根据各自处理器中的超线程特性进行一些更改。
例如,在8核机器中,若是对应用程序的每一个请求都必须在内存中并行执行4个任务,那么这台机器上的负载应该保持为 @2 req/sec,在 ThreadPool 中保持8个线程。
对于 I/O 任务,ExecutorService 中配置的线程数应该取决于外部服务的延迟。
与内存中的任务不一样,I/O 任务中涉及的线程将被阻塞,并处于等待状态,直到外部服务响应或超时。 所以,当涉及 I/O 任务线程被阻塞时,应该增长线程的数量,以处理来自并发请求的额外负载。
I/O 任务的线程数应该以保守的方式增长,由于处于活动状态的许多线程带来了上下文切换的成本,这将影响应用程序的性能。 为了不这种状况,应该根据 I/O 任务中涉及的线程的等待时间按比例增长此机器的线程的确切数量以及负载。
4. 性能测试结果并发
性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意: 这些结果仅对该配置有意义,并不表示一个框架比另外一个框架更好)。框架
5. 使用执行器服务并行化 IO 任务dom
5.1 什么时候使用?
若是一个应用程序部署在多个节点上,而且每一个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。
5.2 何时适用?
若是一个应用程序部署在多个节点上,而且每一个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使状况变得更糟。
当外部服务延迟增长到 400ms 时,性能测试结果以下(请求速率 @50 req/sec,8核)。
5.3 全部任务按顺序执行示例
// I/O 任务:调用外部服务
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();
// 合并来自外部服务的响应
// (内存中的任务将做为此操做的一部分执行)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
// 构建最终响应并将其发送回客户端
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;
5.4 I/O 任务与 ExecutorService 并行执行代码示例
// 添加 I/O 任务
List<Callable<String>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);
// 调用全部并行任务
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);
// 获取 I/O 操做(阻塞调用)结果
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();
// 合并响应(内存中的任务是此操做的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
// 构建最终响应并将其发送回客户端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
6. 使用执行器服务并行化 IO 任务(CompletableFuture)
与上述状况相似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务
6.1 什么时候使用?
若是没有 AsyncResponse,性能与 ExecutorService 相同。 若是多个 API 调用必须异步而且连接起来,那么这种方法更好(相似 Node 中的 Promises)。
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
// I/O 任务
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
// 从 I/O 任务(阻塞调用)得到响应
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();
// 合并响应(内存中的任务将是此操做的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
// 构建最终响应并将其发送回客户端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
7. 使用 ExecutorService 并行处理全部任务
使用 ExecutorService 并行处理全部任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。
HTTP 线程处理传入请求的链接,并将处理传递给 Executor Pool,当全部任务完成后,另外一个 HTTP 线程将把响应发送回客户端(异步非阻塞)。
性能降低缘由:
在同步通讯中,尽管 I/O 任务中涉及的线程被阻塞,可是只要进程有额外的线程来承担并发请求负载,它仍然处于运行状态。
所以,以非阻塞方式保持线程所带来的好处很是少,并且在此模式中处理请求所涉及的成本彷佛很高。
一般,对这里讨论采用的例子使用异步非阻塞方法会下降应用程序的性能。
7.1 什么时候使用?
若是用例相似于服务器端聊天应用程序,在客户端响应以前,线程不须要保持链接,那么异步、非阻塞方法比同步通讯更受欢迎。在这些用例中,系统资源能够经过异步、非阻塞方法获得更好的利用,而不只仅是等待。
// 为异步执行提交并行任务
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
// 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一块儿
// 做为这个操做的一部分,将执行内存中的一些任务
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);
// 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一块儿
// 做为这个操做的一部分,将执行内存中的一些任务
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);
// 构建最终响应并恢复 http 链接,把响应发送回客户端
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);
8. RxJava
这与上面的状况相似,惟一的区别是 RxJava 提供了更好的 DSL 能够进行流式编程,下面的例子中没有体现这一点。
性能优于 CompletableFuture 处理并行任务。
8.1 什么时候使用?
若是编码的场景适合异步非阻塞方式,那么能够首选 RxJava 或任何响应式开发库。 还具备诸如 back-pressure 之类的附加功能,能够在生产者和消费者之间平衡负载。
int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);
// I/O 任务
Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));
// 合并来自 /posts 和 /comments API 的响应
// 做为这个操做的一部分,将执行内存中的一些任务
Observable<String> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));
// 合并来自 /albums 和 /photos API 的响应
// 做为这个操做的一部分,将执行内存中的一些任务
Observable<String> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));
// 构建最终响应
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
9. Disruptor
[Queue vs RingBuffer]
在本例中,HTTP 线程将被阻塞,直到 disruptor 完成任务,而且使用 countdowlatch 将 HTTP 线程与 ExecutorService 中的线程同步。
这个框架的主要特色是在没有任何锁的状况下处理线程间通讯。在 ExecutorService 中,生产者和消费者之间的数据将经过 Queue传递,在生产者和消费者之间的数据传输过程当中涉及到一个锁。 Disruptor 框架经过一个名为 Ring Buffer 的数据结构(它是循环数组队列的扩展版本)来处理这种生产者-消费者通讯,而且不须要任何锁。
这个库不适用于咱们在这里讨论的这种用例。仅出于好奇而添加。
9.1 什么时候使用?
Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一块儿使用,或主要关注内存任务的单个生产者和多个消费者。
static {
int userId = new Random().nextInt(10) + 1; // 示例 Event-Handler; count down latch 用于使线程与 http 线程同步 EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> { event.posts = JsonService.getPosts(); event.countDownLatch.countDown(); }; // 配置 Disputor 用于处理事件 DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler) .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2) .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2) .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2); DISRUPTOR.start();
}
// 对于每一个请求,在 RingBuffer 中发布一个事件:
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
event = ringBuffer.get(sequence); event.countDownLatch = countDownLatch; event.startTime = System.currentTimeMillis();
} finally {
ringBuffer.publish(sequence);
}
try {
event.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
10. Akka
Akka 库的主要优点在于它拥有构建分布式系统的本地支持。
它运行在一个叫作 Actor System 的系统上。这个系统抽象了线程的概念,Actor System 中的 Actor 经过异步消息进行通讯,这相似于生产者和消费者之间的通讯。
这种额外的抽象级别有助于 Actor System 提供诸如容错、位置透明等特性。
使用正确的 Actor-to-Thread 策略,能够对该框架进行优化,使其性能优于上表所示的结果。 虽然它不能在单个节点上与传统方法的性能匹敌,可是因为其构建分布式和弹性系统的能力,仍然是首选。
10.1 示例代码
// 来自 controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
// handler :
public Receive createReceive() {
return receiveBuilder().match(Request.class, request -> { Event event = request.event; // Ideally, immutable data structures should be used here. request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf()); }).match(Event.class, e -> { if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) { int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts, e.comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums, e.photos); String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; e.response = response; e.countDownLatch.countDown(); } }).build();
}
11. 总结
根据机器的负载决定 Executor 框架的配置,并检查是否能够根据应用程序中并行任务的数量进行负载平衡。对于大多数传统应用程序来讲,使用响应式开发库或任何异步库都会下降性能。只有当用例相似于服务器端聊天应用程序时,这个模式才有用,其中线程在客户机响应以前不须要保留链接。Disruptor 框架在与事件驱动的架构模式一块儿使用时性能很好; 可是当 Disruptor 模式与传统架构混合使用时,就咱们在这里讨论的用例而言,它并不符合标准。 这里须要注意的是,Akka 和 Disruptor 库值得单独写一篇文章,介绍如何使用它们来实现事件驱动的架构模式。