Reactor深度探索

技术点html

  • 反应器模式(Reactor) 同步非阻塞,每一个事情能够分为几个步骤,每一个步骤能够相应去作,每一个步骤是不会相互影响的,可是作起来是串行的。有关Netty的具体实现,能够参考Netty整理
  • Proactor模式 异步非阻塞,每一个事情同时作,或者说是异步的去作,
  • 观察者模式(Observer) JDK的实现能够参考使用JDK的观察者接口进行消息推送  观察者模式是一个推的模式
  • 迭代器模式(Iterator) 是一种拉的模式,数据准备好后,进行一个循环拉取。
  • Java并发模型

Reactivereact

Reactive是一种编程方式,由不一样的方式来实现git

  • RxJava : Reactive Extensions
  • Reactor : Spring WebFlux Reactive类库
  • Flow API : Java 9 Flow API实现

阻塞的弊端和并行的复杂github

在Reactor官方的网站上,指出了现有编程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful编程

Reactor认为阻塞多是浪费的浏览器

概括安全

  • 阻塞致使性能瓶颈和浪费资源
  • 增长线程可能会引发资源竞争和并发问题(可见性问题,原子性问题)
  • 并行的方式不是银弹(不能解决全部问题)

阻塞的弊端并发

由如下场景来讲明app

public class DataLoader {
    public final void load() {
        long startTime = System.currentTimeMillis();
        doLoad();
        long costTime = System.currentTimeMillis() - startTime;
        System.out.println("load()总耗时:" + costTime + "毫秒");
    }

    protected void doLoad() {
        loadConfigurations();
        loadUsers();
        loadOrders();
    }

    protected final void loadConfigurations() {
        loadMock("loadConfigurations()",1);
    }

    protected final void loadUsers() {
        loadMock("loadUsers",2);
    }

    protected final void loadOrders() {
        loadMock("loadOrders()",3);
    }

    private void loadMock(String source,int seconds) {
        try {
            long startTime = System.currentTimeMillis();
            long milliseconds = TimeUnit.SECONDS.toMillis(seconds);
            Thread.sleep(milliseconds);
            long costTime = System.currentTimeMillis() - startTime;
            System.out.printf("[线程: %s] %s 耗时: %d 毫秒\n",
                    Thread.currentThread().getName(),source,costTime );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new DataLoader().load();
    }
}

运行结果框架

[线程: main] loadConfigurations() 耗时: 1001 毫秒
[线程: main] loadUsers 耗时: 2001 毫秒
[线程: main] loadOrders() 耗时: 3003 毫秒
load()总耗时:6025毫秒

由结果可知,咱们在依次执行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它们都是main的主线程中的执行。因为加载过程串行执行的关系,致使消耗实现线性累加。串行执行即Blocking模式。

并行的复杂

由如下场景来讲明

public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //CompletionService是一个接口,ExecutorCompletionService为其实现类
        //ExecutorCompletionService在构造函数中会建立一个BlockingQueue
        // (使用的基于链表的无界队列LinkedBlockingQueue),
        // 该BlockingQueue的做用是保存Executor执行的结果。
        // 当计算完成时,调用FutureTask的done方法。
        // 当提交一个任务到ExecutorCompletionService时,
        // 首先将任务包装成QueueingFuture,它是FutureTask的一个子类,
        // 而后改写FutureTask的done方法,以后把Executor执行的计算结果放入BlockingQueue中。
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

        int count = 0;
        while (count < 3) {
            if (completionService.poll() != null) {
                count++;
            }
        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

这里大概解释一下ExecutorCompletionService,它的构造器会初始化一个线程池以及一个BlockingQueue

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

提交线程的时候,会初始化一个FutureTask,并放入QueueingFuture中,交给线程池去执行。

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);
}

咱们看一下QueueingFuture的继承图

由图可知,不管QueueingFuture,FutureTask,RunnableFuture其实都是一个Runnable。而在线程执行完毕后会执行一个done()方法,将结果放入BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
private final BlockingQueue<Future<V>> completionQueue;

BlockingQueue是在ExecutorCompletionService被初始化了的,有关BlockingQueue的介绍能够参考从BlockingQueue到无锁Disruptor的性能提高

最后咱们用到了completionService.poll()

public Future<V> poll() {
    return completionQueue.poll();
}

将Future结果从BlockingQueue队列中弹出。固然咱们示例中并无什么结果须要弹出。

如今咱们回到示例代码,运行结果

[线程: pool-1-thread-1] loadConfigurations() 耗时: 1002 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2002 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3003 毫秒
load()总耗时:3059毫秒

由结果可知,程序改造为并行加载后,性能和资源利用率获得提高,消耗时间取最大者。但因为以上三个方法之间没有数据依赖关系,因此执行方式由串行调整为并行后,可以达到性能提高的效果。若是方法之间存在依赖关系时,那么提高效果是否还会如此明显,而且若是确保它们的执行循序。问题如(线程安全性,原子性,可见性),由此问题能够参考Fork/Join框架原理和使用探秘 ,在这篇博客中就能够看到为了保证线程安全性,性能已经不如单线程。

Reactor认为异步不必定可以救赎

概括

  • Callbacks是解决非阻塞的方案,而后它们之间很难组合,而且快速地将代码引导至"Callback Hell"的不归路
  • Futures相对于Callbacks好一点,不过仍是没法组合,不过ComletableFuture可以提高这方面的不足。好比在上面的示例中,若是loadUsers要传递数据到loadOrders中也是极其困难的。

Callback Hell

咱们来看这样一段代码

public class JavaGUI {
    public static void main(String[] args) {
        final JFrame jFrame = new JFrame("GUI 示例");
        jFrame.setBounds(500,300,400,300);
        LayoutManager layoutManager = new BorderLayout(400,300);
        jFrame.setLayout(layoutManager);
        jFrame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                System.out.printf("[线程 : %s] 鼠标点击,坐标(X : %d,Y : %d)\n",
                        currentThreadName(),e.getX(),e.getY());
            }
        });
        jFrame.addWindowListener(new WindowAdapter() {
            @Override
            public void windowClosing(WindowEvent e) {
                System.out.printf("[线程 : %s] 清除 jFrame...\n",currentThreadName());
                jFrame.dispose();
            }

            @Override
            public void windowClosed(WindowEvent e) {
                System.out.printf("[线程 : %s] 退出程序... \n",currentThreadName());
                System.exit(0);
            }
        });
        System.out.println("当前线程:" + currentThreadName());
        jFrame.setVisible(true);
    }

    private static String currentThreadName() {
        return Thread.currentThread().getName();
    }
}

当咱们执行了main方法之后,会打印当前线程,而且显示window窗体。

咱们能够看到打印了当前线程为main的主线程。当咱们在窗体内用鼠标点击的时候会打印以下内容

[线程 : AWT-EventQueue-0] 鼠标点击,坐标(X : 218,Y : 167)
[线程 : AWT-EventQueue-0] 鼠标点击,坐标(X : 130,Y : 120)


由打印的内容可知,咱们鼠标点击并非main的主线程来执行的,说明它是一个异步的Callback,并且是非阻塞的,当咱们点击鼠标产生鼠标事件时,没有任何线程会阻塞该线程的执行。当咱们关闭窗口的时候,会打印以下内容

[线程 : AWT-EventQueue-0] 清除 jFrame...
[线程 : AWT-EventQueue-0] 退出程序...

说明关闭也是由同一个异步线程来执行的。由此能够看出Java GUI以及事件/监听模式基本采用匿名内置类,即回调实现。当监听的维度增多,Callback实现也随之增多。同时,事件/监听者模式(观察者模式)的并发模型可为同步或异步。这里说的同步、异步是线程模型;阻塞、非阻塞是编程模型。在Spring中,于这种GUI回调相似的有Spring Boot的消息事件机制 ,这里面也有同步,异步,阻塞,非阻塞的说明。

Future阻塞问题

咱们来修改一下ParalleDataLoader的代码造成一个Future的阻塞

public class FutureBlockingDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        runComletely(completionService.submit(super::loadConfigurations,null));
        runComletely(completionService.submit(super::loadUsers,null));
        runComletely(completionService.submit(super::loadOrders,null));
        executorService.shutdown();
    }

    private void runComletely(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new FutureBlockingDataLoader().load();
    }
}

运行结果

[线程: pool-1-thread-1] loadConfigurations() 耗时: 1000 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2002 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3001 毫秒
load()总耗时:6073毫秒

由结果可知,future.get()成为future阻塞的源泉。该方法不得不等待任务执行完成,换言之,若是多个任务提交后,返回多个Future逐一调用get()方法时,将会依次blocking,任务的执行从并行变成串行。

Future链式问题

因为Future没法异步执行结果链式处理,尽管FutureBlockingDataLoader可以解决方法数据依赖以及顺序执行的问题,不过它将并行执行带回了阻塞(串行)执行。因此,它不是一个理想实现。不过CompletableFuture能够帮助提高Future限制。

public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
                ).join();
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

运行结果

[线程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗时: 1004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadUsers 耗时: 2004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗时: 3001 毫秒
[线程 :ForkJoinPool.commonPool-worker-9] 加载完成
load()总耗时:6079毫秒

由结果可知,当异步执行时,它并非由3个线程去执行,而是由同一个线程进行链式执行的,之因此加入join,是为了让主线程等待返回。它跟第一个DataLoader的不一样在于,DataLoader是所有由主线程去阻塞执行的,而这里若是不使用join()则确定为非阻塞的,只不过join()会阻塞,这个是线程相关的常识,具体能够参考线程,JVM锁整理 。也就是说,若是去掉join(),因为CompletableFuture都是守护线程,主线程执行完,它是不会执行的,如今咱们把代码稍做修改以下。

public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
                );
        System.out.println("[线程 :" + Thread.currentThread().getName() +"】后续执行");
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

运行结果

[线程 :main】后续执行
load()总耗时:60毫秒

证实CompletableFuture还未启动,并未执行。但若是咱们把new ChainDataLoader().load();这段代码放入Controller中

@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ChainDataLoader().load();
    }
}

经过浏览器访问

能够看到后台打印

[线程 :reactor-http-nio-2】后续执行
load()总耗时:3毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗时: 1003 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadUsers 耗时: 2004 毫秒
[线程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗时: 3005 毫秒
[线程 :ForkJoinPool.commonPool-worker-9] 加载完成

证实异步线程是非阻塞而且执行了的。

这里咱们能够看到CompletableFuture属于异步操做,若是强制等待结束的话,又回到了阻塞编程的方式,而且让咱们明白到非阻塞不必定提高性能,由于即使是非阻塞,在异步线程中,它同样要使用6秒才能完成,相比于ParalleDataLoader的并行执行,只须要3秒完成来讲,非阻塞的好处是让主方法线程及时完成,让主方法线程池能够及时释放。不过同理,在ParalleDataLoader中若是不进行completionService.poll()的阻塞操做,主线程一样会率先返回,因为线程池中的线程并不是守护线程,它在主线程完成后会继续执行。

public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

//        int count = 0;
//        while (count < 3) {
//            if (completionService.poll() != null) {
//                count++;
//            }
//        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

运行结果

load()总耗时:59毫秒
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1004 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2004 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3002 毫秒

一样咱们把new ParalleDataLoader().load()放入Controller中

@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ParalleDataLoader().load();
    }
}

经过浏览器访问,后台打印

load()总耗时:1毫秒
[线程: pool-1-thread-1] loadConfigurations() 耗时: 1000 毫秒
[线程: pool-1-thread-2] loadUsers 耗时: 2004 毫秒
[线程: pool-1-thread-3] loadOrders() 耗时: 3005 毫秒

这里一样为异步非阻塞,而且并发了3个线程,异步线程总耗时是3秒。可是这样会形成异步线程池的线程数并发量比较大。

Reactive Stream JVM认为异步系统和资源消费须要特殊处理,在Reactor的github的官网上,有这样一段描述https://github.com/reactive-streams/reactive-streams-jvm

  • 流式数据容量难以预判
  • 异步编程复杂
  • 数据源和消费端之间资源消费难以平衡

Reactive的理解,须要从不少方便

  • 维基百科
  • The Reactive Mainifesto : Resactive组织
  • Spring Framework
  • ReactiveX : RxJava
  • Reactor : WebFlux底层
  • @andrestaltz :著名做者

Reactive Programming定义

The Reactive Mainifesto认为:官网https://www.reactivemanifesto.org/

  • 响应性 (Responsive)
  • 适应性强的 (Resilient)
  • 弹性的 (Elastic)
  • 消息驱动的 (Message Driven)

侧重点

  • 面向Reactive系统
  • Reactive系统原则

WebFlux的线程观察

public class FluxTest {
    public static void main(String[] args) {
        Flux.just("a","b","c")
                .subscribe(FluxTest::println);
        System.out.println("你好");
    }

    private static void println(Object object) {
        String threadName = Thread.currentThread().getName();
        System.out.println("[线程: " + threadName + "] " + object);
    }
}

运行结果

[线程: main] a
[线程: main] b
[线程: main] c
你好

根据结果,咱们能够看到这并非一个异步的,而是一个同步非阻塞的主线程执行。如今咱们来修改一下代码

public class FluxTest {
    public static void main(String[] args) {
        Flux.just("a","b","c")
                .publishOn(Schedulers.elastic())
                .subscribe(FluxTest::println);
        System.out.println("你好");
    }

    private static void println(Object object) {
        String threadName = Thread.currentThread().getName();
        System.out.println("[线程: " + threadName + "] " + object);
    }
}

运行结果

你好 [线程: elastic-2] a [线程: elastic-2] b [线程: elastic-2] c