从crawler4j源码中看wait与notify

这是我参与更文挑战的第1天,活动详情查看: 更文挑战java

引言

crawler4j 是一个开源的 Java 爬虫框架,且拥有4k多个 star ,相信其源码值得我去研究,因此才写下这篇文章。若有错误欢迎联系我指正!git

其实本文的重点不在于研究 crawler4j 源码中的各类逻辑、细节等,主要仍是以 crawler4j 这个例子来看 Java 中 waitnotify 的使用,看看热门开源项目里是如何使用如何编码的。github

想快速了解的话,你能够直接看核心逻辑部分,也能够直接看究极简单版wait/notify使用markdown

正文

crawler4j 中最重要的两个类莫过于 CrawlControllerWebCrawler 了,一个是用于设置与开启爬虫,而另外一个则是爬虫的核心实现类。这里讨论的代码基本都在 CrawlController 类中。session

熟悉的同窗都知道,在开启 controller 时通常有两个用法,以下:框架

// 用法1:阻塞式,当爬虫线程都结束后才会执行这行之后的代码
controller.start(factory, numberOfCrawlers);
复制代码
// 用法2:非阻塞式,在 start 之后, waitUntilFinish 之前的代码都会马上执行,在 waitUntilFinish 处阻塞
controller.startNonBlocking(factory, numberOfCrawlers);
// 这中间的代码都会异步执行
controller.waitUntilFinish();
复制代码

这个的源码部分就是本文重点要讨论的 waitnotify 的使用。异步

两个重要变量

首先,在 CrawlController 中定义了两个这个功能须要的重要变量:ide

/** * Is the crawling of this session finished? */
protected boolean finished;

protected final Object waitingLock = new Object();

复制代码
  • finished 用于判断这次爬取是否已结束
  • waitingLock 则是用于加锁

阻塞式 start 方法

为了只关注重要内容,其余部分代码我以注释的形式带过。oop

咱们调用 start 方法的入口在这里post

/** * Start the crawling session and wait for it to finish. * * @param crawlerFactory * factory to create crawlers on demand for each thread * @param numberOfCrawlers * the number of concurrent threads that will be contributing in * this crawling session. * @param <T> Your class extending WebCrawler */
public <T extends WebCrawler> void start(WebCrawlerFactory<T> crawlerFactory, int numberOfCrawlers) {
    this.start(crawlerFactory, numberOfCrawlers, true);
}
复制代码

它会去调用另外一个有更多参数的 start 方法,多的参数就是 isBlocking ,这个参数表示是否须要阻塞,具体做用在下面这个 start 方法的注释中给出

protected <T extends WebCrawler> void start(final WebCrawlerFactory<T> crawlerFactory, final int numberOfCrawlers, boolean isBlocking) {

    // 根据提供的工厂类 crawlerFactory 构造指定数量的线程并使它们开始运行

    // 建立一个监控线程 monitorThread 以下
    Thread monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                synchronized (waitingLock) {

                    while (true) {
                        // 设置的监控循环周期
                        sleep(config.getThreadMonitoringDelaySeconds());
                        boolean someoneIsWorking = false;
                        
                        // 第一部分: 
                        // 观察每一个 爬虫线程 是否正常运行,若没有正常运行则采起相应措施
                        // 第一部分的代码省略,有兴趣能够去 github 看
                    
                        // 第二部分: 
                        // 查看是否还有正在工做的线程,若没有则准备退出并关闭资源
                        // 这个部分也是咱们常常看到的 "It looks like no thread is working, waiting for ..." 等 打印日志的所在源码部分
                        // 在关闭时会调用 notifyAll
                        if (!someoneIsWorking && shutOnEmpty) {
                            // 再次确保无线程工做且队列中无 URL 等待爬取

                            // 释放资源

                            waitingLock.notifyAll();

                            // 释放资源
                        }
                    }
                }
            } catch (Throwable e) {
                if (config.isHaltOnError()) {
                    // 发生了某个错误
                    setError(e);
                    synchronized (waitingLock) {
                        // 释放资源

                        waitingLock.notifyAll();

                        // 释放资源
                    }
                } else {
                    logger.error("Unexpected Error", e);
                }
            }
        }

    });

    monitorThread.start();

    // 若是须要阻塞,那么就调用 waitUntilFinish 这个方法,代码执行到这就会阻塞住
    if (isBlocking) {
        waitUntilFinish();
    }
}
复制代码

从代码中能够看到,阻塞的地方在最后几行,也就是监控线程开启后的 waitUntilFinish 方法。

监控线程在监控到线程都运行完后,调用 waitingLock.notifyAll() 从而使这里的阻塞结束,那么这是如何作到的呢,咱们再来看 waitUntilFinish 方法。

waitUntilFinish 方法如何使 start 阻塞

这个方法的源码很短,我直接放出来。

/** * Wait until this crawling session finishes. */
public void waitUntilFinish() {
    while (!finished) {
        synchronized (waitingLock) {
            if (config.isHaltOnError()) {
                Throwable t = getError();
                if (t != null && config.isHaltOnError()) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException)t;
                    } else if (t instanceof Error) {
                        throw (Error)t;
                    } else {
                        throw new RuntimeException("error on monitor thread", t);
                    }
                }
            }
            if (finished) {
                return;
            }
            try {
                // 主动让出并等待锁资源
                waitingLock.wait();
            } catch (InterruptedException e) {
                logger.error("Error occurred", e);
            }
        }
    }
}
复制代码

首先,在 start 方法和 waitUntilFinish 方法中都有 synchronized 来修饰关键的代码块,而且争夺的都是同一个锁 waitingLock。这意味着一方执行,就会有另外一方被阻塞。咱们但愿的是 waitUntilFinish 一直被阻塞,直到爬虫线程都执行完(也就是 start 方法中对应的 synchronized 方法块里的内容)后,再让 waitUntilFinish 方法结束。这也就是源码中对这部分的处理,同时也是 wait 和 notify 使用的思想。

核心逻辑

再来理一遍源码这块的逻辑:

  1. monitorThread 的 run 方法中使用 synchronized 获取锁 waitingLock,并循环检查是否全部爬虫线程、爬虫任务都执行完毕。
  2. waitUntilFinish 使用 synchronized 获取锁 waitingLock,并根据变量 isFinished 检查爬取过程是否结束,若结束则直接返回;若未结束,则调用 wait 方法让出资源给 monitorThread 的 run 方法。
  3. 即便 waitUntilFinish 在调用 wait 方法后又获取到了锁 waitingLock ,它也会根据爬取是否结束 isFinished 来判断是否要再次进入循环调用 wait 方法。
  4. monitorThread 在检查到全部爬虫线程、爬虫任务都执行完毕后,调用 notifyAll 方法(和 notify 同样,只是对全部竞争锁资源的线程都发送通知)来让 waitUntilFinish 继续从 wait 处执行下去
  5. waitUntilFinish 得到锁资源,并从调用 wait 方法后的代码继续向下执行,在循环判断 isFinished 时发现爬取过程结束了,则直接返回,整个过程结束

其中也还有不少细节没有说起,如延时的设置、循环监控周期、资源的释放等等,因为不是本文关注的重点内容,能够本身参照源码理解一下。

透过现象看本质

不难看出其实本质就是某个线程调用 wait 方法主动让出锁给另外一个线程,而后另外一个线程在执行完任务后调用 notify/notifyAll 来通知它执行完了以让它开始抢占锁

其中还有一些细节:

  • 调用 notify 后,调用 wait 的线程并不会立刻得到锁资源,而是等调用 notify 的那个线程释放锁资源后它才能得到,也就是说即便线程调用了 notify 方法,可能也要等到他退出 synchronized 代码块后,其余线程才能得到锁资源
  • 调用 wait 释放锁,又从新得到锁后,代码会从 wait 方法下面的那一行继续向下执行,而不会去回到 synchronized 代码块开始的地方执行,这也是为何源码中要使用 while 循环去重复获取锁资源。由于若是没有这层循环而该线程在释放锁后从新获取锁时其实爬取过程还没结束(也就是 isFinished 是 False),那 waitUntilFinish 就会直接结束
  • wait 其实能够设置超时时长 wait(long timeout),在 timeout 时间后唤醒本身,这就至关于 timeout 时间后有人来通知他能够去抢锁资源了

究极简易版实现

为了加深理解,本身动手实现一下 crawler4j 这个机制的究极简易版以下(注意只是实现 wait/notify 机制):

package thread_practice;

public class WaitNotify {

    private final Object waitingLock = new Object();
    private boolean isFinished = false;

    public void start() {
        synchronized (waitingLock) {
            isFinished = false;
            System.out.println("doing sth...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done.");
            isFinished = true;
            waitingLock.notifyAll();
        }
    }

    public void waitUntilFinish() {
        synchronized (waitingLock) {
            if (isFinished) return;

            try {
                waitingLock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify();
        new Thread(() -> wn.start()).start();
        wn.waitUntilFinish();
        System.out.println("continue another thing...");
    }
}
复制代码

执行程序5秒以内:

执行程序5秒以后:

能够看到主线程确实是阻塞在了 wn.waitUntilFinish() 这个地方,在5秒以后才继续执行下去。 其逻辑和前面几节个人解释同样,是只提取了核心部分的简化版。

总结

本文结合 crawler4j 中实际使用的例子对实际场景中如何使用 wait 与 notify 进行了介绍与讨论,也根据 crawler4j 中的场景实现了一个简易版功能。线程之间的通讯离不开 wait 与 notify ,固然也不止 wait 与 notify ,我会在之后对这方面作更深刻的研究。

本文有什么错误欢迎联系我指正。

扩展

这里只是单个线程通知单个线程任务执行完毕,若是是多个线程通知单个线程的场景怎么处理呢?

  • 若是是多个线程都执行完了,才通知某个线程,那能够参照 crawler4j 的方式,使用一个监控线程去循环检查全部线程是否执行完
  • 若是是多个线程中的某个执行完了就要通知,如何实现?
相关文章
相关标签/搜索