做者:唐尤华
https://dzone.com/articles/a-...
几年前 NoSQL 开始流行的时候,像其余团队同样,咱们的团队也热衷于使人兴奋的新东西,而且计划替换一个应用程序的数据库。 可是,当深刻实现细节时,咱们想起了一位智者曾经说过的话:“细节决定成败”。最终咱们意识到 NoSQL 不是解决全部问题的银弹,而 NoSQL vs RDMS 的答案是:“视状况而定”。html
相似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了不再犯一样的错误,咱们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差别,以及如何肯定各自框架的正确用法。java
本文中用到的术语在这里有更详细的描述。面试
在开始比较并发框架的以前,让咱们快速复习一下如何配置最佳线程数以提升并行任务的性能。 这个理论适用于全部框架,而且在全部框架中使用相同的线程配置来度量性能。数据库
参考: http://baddotrobot.com/blog/2...编程
性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意: 这些结果仅对该配置有意义,并不表示一个框架比另外一个框架更好)。后端
若是一个应用程序部署在多个节点上,而且每一个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。数组
若是一个应用程序部署在多个节点上,而且每一个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使状况变得更糟。服务器
当外部服务延迟增长到 400ms 时,性能测试结果以下(请求速率 @50 req/sec,8核)。数据结构
5.3 全部任务按顺序执行示例多线程
// I/O 任务:调用外部服务 String posts = JsonService.getPosts(); String comments = JsonService.getComments(); String albums = JsonService.getAlbums(); String photos = JsonService.getPhotos(); // 合并来自外部服务的响应 // (内存中的任务将做为此操做的一部分执行) int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 构建最终响应并将其发送回客户端 String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; return response;
// 添加 I/O 任务 List<Callable<String>> ioCallableTasks = new ArrayList<>(); ioCallableTasks.add(JsonService::getPosts); ioCallableTasks.add(JsonService::getComments); ioCallableTasks.add(JsonService::getAlbums); ioCallableTasks.add(JsonService::getPhotos); // 调用全部并行任务 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks); // 获取 I/O 操做(阻塞调用)结果 String posts = futuresOfIOTasks.get(0).get(); String comments = futuresOfIOTasks.get(1).get(); String albums = futuresOfIOTasks.get(2).get(); String photos = futuresOfIOTasks.get(3).get(); // 合并响应(内存中的任务是此操做的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 构建最终响应并将其发送回客户端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
与上述状况相似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务
若是没有 AsyncResponse,性能与 ExecutorService 相同。 若是多个 API 调用必须异步而且连接起来,那么这种方法更好(相似 Node 中的 Promises)。
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); // I/O 任务 CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService); CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get(); // 从 I/O 任务(阻塞调用)得到响应 String posts = postsFuture.get(); String comments = commentsFuture.get(); String albums = albumsFuture.get(); String photos = photosFuture.get(); // 合并响应(内存中的任务将是此操做的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 构建最终响应并将其发送回客户端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
使用 ExecutorService 并行处理全部任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。
图片来自 http://tutorials.jenkov.com/j...
若是用例相似于服务器端聊天应用程序,在客户端响应以前,线程不须要保持链接,那么异步、非阻塞方法比同步通讯更受欢迎。在这些用例中,系统资源能够经过异步、非阻塞方法获得更好的利用,而不只仅是等待。
// 为异步执行提交并行任务 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService); // 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一块儿 // 做为这个操做的一部分,将执行内存中的一些任务 CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments), ioExecutorService); // 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一块儿 // 做为这个操做的一部分,将执行内存中的一些任务 CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos), ioExecutorService); // 构建最终响应并恢复 http 链接,把响应发送回客户端 postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> { LOG.info("Building Async Response in Thread " + Thread.currentThread().getName()); String response = s1 + s2; asyncHttpResponse.resume(response); }, ioExecutorService);
若是编码的场景适合异步非阻塞方式,那么能够首选 RxJava 或任何响应式开发库。 还具备诸如 back-pressure 之类的附加功能,能够在生产者和消费者之间平衡负载。
int userId = new Random().nextInt(10) + 1; ExecutorService executor = CustomThreads.getExecutorService(8); // I/O 任务 Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) .subscribeOn(Schedulers.from(executor)); Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) .subscribeOn(Schedulers.from(executor)); Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) .subscribeOn(Schedulers.from(executor)); Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) .subscribeOn(Schedulers.from(executor)); // 合并来自 /posts 和 /comments API 的响应 // 做为这个操做的一部分,将执行内存中的一些任务 Observable<String> postsAndCommentsObservable = Observable .zip(postsObservable, commentsObservable, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments)) .subscribeOn(Schedulers.from(executor)); // 合并来自 /albums 和 /photos API 的响应 // 做为这个操做的一部分,将执行内存中的一些任务 Observable<String> albumsAndPhotosObservable = Observable .zip(albumsObservable, photosObservable, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos)) .subscribeOn(Schedulers.from(executor)); // 构建最终响应 Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2) .subscribeOn(Schedulers.from(executor)) .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
[Queue vs RingBuffer]
图片1: http://tutorials.jenkov.com/j...
图片2: https://www.baeldung.com/lmax...
Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一块儿使用,或主要关注内存任务的单个生产者和多个消费者。
static { int userId = new Random().nextInt(10) + 1; // 示例 Event-Handler; count down latch 用于使线程与 http 线程同步 EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> { event.posts = JsonService.getPosts(); event.countDownLatch.countDown(); }; // 配置 Disputor 用于处理事件 DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler) .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2) .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2) .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2); DISRUPTOR.start(); } // 对于每一个请求,在 RingBuffer 中发布一个事件: Event event = null; RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer(); long sequence = ringBuffer.next(); CountDownLatch countDownLatch = new CountDownLatch(6); try { event = ringBuffer.get(sequence); event.countDownLatch = countDownLatch; event.startTime = System.currentTimeMillis(); } finally { ringBuffer.publish(sequence); } try { event.countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }
图片来自: https://blog.codecentric.de/e...
// 来自 controller : Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender()); // handler : public Receive createReceive() { return receiveBuilder().match(Request.class, request -> { Event event = request.event; // Ideally, immutable data structures should be used here. request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf()); }).match(Event.class, e -> { if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) { int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts, e.comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums, e.photos); String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; e.response = response; e.countDownLatch.countDown(); } }).build(); }
推荐去个人博客阅读更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
以为不错,别忘了点赞+转发哦!