技术点html
Reactivereact
Reactive是一种编程方式,由不一样的方式来实现git
阻塞的弊端和并行的复杂github
在Reactor官方的网站上,指出了现有编程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful编程
Reactor认为阻塞多是浪费的浏览器
概括安全
阻塞的弊端并发
由如下场景来讲明app
public class DataLoader { public final void load() { long startTime = System.currentTimeMillis(); doLoad(); long costTime = System.currentTimeMillis() - startTime; System.out.println("load()总耗时:" + costTime + "毫秒"); } protected void doLoad() { loadConfigurations(); loadUsers(); loadOrders(); } protected final void loadConfigurations() { loadMock("loadConfigurations()",1); } protected final void loadUsers() { loadMock("loadUsers",2); } protected final void loadOrders() { loadMock("loadOrders()",3); } private void loadMock(String source,int seconds) { try { long startTime = System.currentTimeMillis(); long milliseconds = TimeUnit.SECONDS.toMillis(seconds); Thread.sleep(milliseconds); long costTime = System.currentTimeMillis() - startTime; System.out.printf("[线程: %s] %s 耗时: %d 毫秒\n", Thread.currentThread().getName(),source,costTime ); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new DataLoader().load(); } }
运行结果框架
[线程: main] loadConfigurations() 耗时: 1001 毫秒
[线程: main] loadUsers 耗时: 2001 毫秒
[线程: main] loadOrders() 耗时: 3003 毫秒
load()总耗时:6025毫秒
由结果可知,咱们在依次执行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它们都是main的主线程中的执行。因为加载过程串行执行的关系,致使消耗实现线性累加。串行执行即Blocking模式。
并行的复杂
由如下场景来讲明
public class ParalleDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); //CompletionService是一个接口,ExecutorCompletionService为其实现类 //ExecutorCompletionService在构造函数中会建立一个BlockingQueue // (使用的基于链表的无界队列LinkedBlockingQueue), // 该BlockingQueue的做用是保存Executor执行的结果。 // 当计算完成时,调用FutureTask的done方法。 // 当提交一个任务到ExecutorCompletionService时, // 首先将任务包装成QueueingFuture,它是FutureTask的一个子类, // 而后改写FutureTask的done方法,以后把Executor执行的计算结果放入BlockingQueue中。 CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(super::loadConfigurations,null); completionService.submit(super::loadUsers,null); completionService.submit(super::loadOrders,null); int count = 0; while (count < 3) { if (completionService.poll() != null) { count++; } } executorService.shutdown(); } public static void main(String[] args) { new ParalleDataLoader().load(); } }
这里大概解释一下ExecutorCompletionService,它的构造器会初始化一个线程池以及一个BlockingQueue
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
提交线程的时候,会初始化一个FutureTask,并放入QueueingFuture中,交给线程池去执行。
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
咱们看一下QueueingFuture的继承图
由图可知,不管QueueingFuture,FutureTask,RunnableFuture其实都是一个Runnable。而在线程执行完毕后会执行一个done()方法,将结果放入BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
private final BlockingQueue<Future<V>> completionQueue;
BlockingQueue是在ExecutorCompletionService被初始化了的,有关BlockingQueue的介绍能够参考从BlockingQueue到无锁Disruptor的性能提高
最后咱们用到了completionService.poll()
public Future<V> poll() { return completionQueue.poll(); }
将Future结果从BlockingQueue队列中弹出。固然咱们示例中并无什么结果须要弹出。
如今咱们回到示例代码,运行结果
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1002 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2002 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3003 毫秒
load()总耗时:3059毫秒
由结果可知,程序改造为并行加载后,性能和资源利用率获得提高,消耗时间取最大者。但因为以上三个方法之间没有数据依赖关系,因此执行方式由串行调整为并行后,可以达到性能提高的效果。若是方法之间存在依赖关系时,那么提高效果是否还会如此明显,而且若是确保它们的执行循序。问题如(线程安全性,原子性,可见性),由此问题能够参考Fork/Join框架原理和使用探秘 ,在这篇博客中就能够看到为了保证线程安全性,性能已经不如单线程。
Reactor认为异步不必定可以救赎
概括
Callback Hell
咱们来看这样一段代码
public class JavaGUI { public static void main(String[] args) { final JFrame jFrame = new JFrame("GUI 示例"); jFrame.setBounds(500,300,400,300); LayoutManager layoutManager = new BorderLayout(400,300); jFrame.setLayout(layoutManager); jFrame.addMouseListener(new MouseAdapter() { @Override public void mouseClicked(MouseEvent e) { System.out.printf("[线程 : %s] 鼠标点击,坐标(X : %d,Y : %d)\n", currentThreadName(),e.getX(),e.getY()); } }); jFrame.addWindowListener(new WindowAdapter() { @Override public void windowClosing(WindowEvent e) { System.out.printf("[线程 : %s] 清除 jFrame...\n",currentThreadName()); jFrame.dispose(); } @Override public void windowClosed(WindowEvent e) { System.out.printf("[线程 : %s] 退出程序... \n",currentThreadName()); System.exit(0); } }); System.out.println("当前线程:" + currentThreadName()); jFrame.setVisible(true); } private static String currentThreadName() { return Thread.currentThread().getName(); } }
当咱们执行了main方法之后,会打印当前线程,而且显示window窗体。
咱们能够看到打印了当前线程为main的主线程。当咱们在窗体内用鼠标点击的时候会打印以下内容
[线程 : AWT-EventQueue-0] 鼠标点击,坐标(X : 218,Y : 167)
[线程 : AWT-EventQueue-0] 鼠标点击,坐标(X : 130,Y : 120)
由打印的内容可知,咱们鼠标点击并非main的主线程来执行的,说明它是一个异步的Callback,并且是非阻塞的,当咱们点击鼠标产生鼠标事件时,没有任何线程会阻塞该线程的执行。当咱们关闭窗口的时候,会打印以下内容
[线程 : AWT-EventQueue-0] 清除 jFrame...
[线程 : AWT-EventQueue-0] 退出程序...
说明关闭也是由同一个异步线程来执行的。由此能够看出Java GUI以及事件/监听模式基本采用匿名内置类,即回调实现。当监听的维度增多,Callback实现也随之增多。同时,事件/监听者模式(观察者模式)的并发模型可为同步或异步。这里说的同步、异步是线程模型;阻塞、非阻塞是编程模型。在Spring中,于这种GUI回调相似的有Spring Boot的消息事件机制 ,这里面也有同步,异步,阻塞,非阻塞的说明。
Future阻塞问题
咱们来修改一下ParalleDataLoader的代码造成一个Future的阻塞
public class FutureBlockingDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); runComletely(completionService.submit(super::loadConfigurations,null)); runComletely(completionService.submit(super::loadUsers,null)); runComletely(completionService.submit(super::loadOrders,null)); executorService.shutdown(); } private void runComletely(Future<?> future) { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void main(String[] args) { new FutureBlockingDataLoader().load(); } }
运行结果
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1000 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2002 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3001 毫秒
load()总耗时:6073毫秒
由结果可知,future.get()成为future阻塞的源泉。该方法不得不等待任务执行完成,换言之,若是多个任务提交后,返回多个Future逐一调用get()方法时,将会依次blocking,任务的执行从并行变成串行。
Future链式问题
因为Future没法异步执行结果链式处理,尽管FutureBlockingDataLoader可以解决方法数据依赖以及顺序执行的问题,不过它将并行执行带回了阻塞(串行)执行。因此,它不是一个理想实现。不过CompletableFuture能够帮助提高Future限制。
public class ChainDataLoader extends DataLoader { protected void doLoad() { CompletableFuture .runAsync(super::loadConfigurations) .thenRun(super::loadUsers) .thenRun(super::loadOrders) .whenComplete((result,throwable) -> System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成") ).join(); } public static void main(String[] args) { new ChainDataLoader().load(); } }
运行结果
[线程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗时: 1004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadUsers 耗时: 2004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗时: 3001 毫秒
[线程 :ForkJoinPool.commonPool-worker-9] 加载完成
load()总耗时:6079毫秒
由结果可知,当异步执行时,它并非由3个线程去执行,而是由同一个线程进行链式执行的,之因此加入join,是为了让主线程等待返回。它跟第一个DataLoader的不一样在于,DataLoader是所有由主线程去阻塞执行的,而这里若是不使用join()则确定为非阻塞的,只不过join()会阻塞,这个是线程相关的常识,具体能够参考线程,JVM锁整理 。也就是说,若是去掉join(),因为CompletableFuture都是守护线程,主线程执行完,它是不会执行的,如今咱们把代码稍做修改以下。
public class ChainDataLoader extends DataLoader { protected void doLoad() { CompletableFuture .runAsync(super::loadConfigurations) .thenRun(super::loadUsers) .thenRun(super::loadOrders) .whenComplete((result,throwable) -> System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成") ); System.out.println("[线程 :" + Thread.currentThread().getName() +"】后续执行"); } public static void main(String[] args) { new ChainDataLoader().load(); } }
运行结果
[线程 :main】后续执行
load()总耗时:60毫秒
证实CompletableFuture还未启动,并未执行。但若是咱们把new ChainDataLoader().load();这段代码放入Controller中
@RestController public class TestController { @GetMapping("/future") public void findfuture() { new ChainDataLoader().load(); } }
经过浏览器访问
能够看到后台打印
[线程 :reactor-http-nio-2】后续执行
load()总耗时:3毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗时: 1003 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadUsers 耗时: 2004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗时: 3005 毫秒
[线程 :ForkJoinPool.commonPool-worker-9] 加载完成
证实异步线程是非阻塞而且执行了的。
这里咱们能够看到CompletableFuture属于异步操做,若是强制等待结束的话,又回到了阻塞编程的方式,而且让咱们明白到非阻塞不必定提高性能,由于即使是非阻塞,在异步线程中,它同样要使用6秒才能完成,相比于ParalleDataLoader的并行执行,只须要3秒完成来讲,非阻塞的好处是让主方法线程及时完成,让主方法线程池能够及时释放。不过同理,在ParalleDataLoader中若是不进行completionService.poll()的阻塞操做,主线程一样会率先返回,因为线程池中的线程并不是守护线程,它在主线程完成后会继续执行。
public class ParalleDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(super::loadConfigurations,null); completionService.submit(super::loadUsers,null); completionService.submit(super::loadOrders,null); // int count = 0; // while (count < 3) { // if (completionService.poll() != null) { // count++; // } // } executorService.shutdown(); } public static void main(String[] args) { new ParalleDataLoader().load(); } }
运行结果
load()总耗时:59毫秒
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1004 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2004 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3002 毫秒
一样咱们把new ParalleDataLoader().load()放入Controller中
@RestController public class TestController { @GetMapping("/future") public void findfuture() { new ParalleDataLoader().load(); } }
经过浏览器访问,后台打印
load()总耗时:1毫秒
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1000 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2004 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3005 毫秒
这里一样为异步非阻塞,而且并发了3个线程,异步线程总耗时是3秒。可是这样会形成异步线程池的线程数并发量比较大。
Reactive Stream JVM认为异步系统和资源消费须要特殊处理,在Reactor的github的官网上,有这样一段描述https://github.com/reactive-streams/reactive-streams-jvm
Reactive的理解,须要从不少方便
Reactive Programming定义
The Reactive Mainifesto认为:官网https://www.reactivemanifesto.org/
侧重点
WebFlux的线程观察
public class FluxTest { public static void main(String[] args) { Flux.just("a","b","c") .subscribe(FluxTest::println); System.out.println("你好"); } private static void println(Object object) { String threadName = Thread.currentThread().getName(); System.out.println("[线程: " + threadName + "] " + object); } }
运行结果
[线程: main] a
[线程: main] b
[线程: main] c
你好
根据结果,咱们能够看到这并非一个异步的,而是一个同步非阻塞的主线程执行。如今咱们来修改一下代码
public class FluxTest { public static void main(String[] args) { Flux.just("a","b","c") .publishOn(Schedulers.elastic()) .subscribe(FluxTest::println); System.out.println("你好"); } private static void println(Object object) { String threadName = Thread.currentThread().getName(); System.out.println("[线程: " + threadName + "] " + object); } }
运行结果
你好 [线程: elastic-2] a [线程: elastic-2] b [线程: elastic-2] c