这是我参与更文挑战的第1天,活动详情查看: 更文挑战java
crawler4j 是一个开源的 Java 爬虫框架,且拥有4k多个 star ,相信其源码值得我去研究,因此才写下这篇文章。若有错误欢迎联系我指正!git
其实本文的重点不在于研究 crawler4j 源码中的各类逻辑、细节等,主要仍是以 crawler4j 这个例子来看 Java 中 wait 与 notify 的使用,看看热门开源项目里是如何使用如何编码的。github
想快速了解的话,你能够直接看核心逻辑部分,也能够直接看究极简单版wait/notify使用markdown
crawler4j 中最重要的两个类莫过于 CrawlController 和 WebCrawler 了,一个是用于设置与开启爬虫,而另外一个则是爬虫的核心实现类。这里讨论的代码基本都在 CrawlController 类中。session
熟悉的同窗都知道,在开启 controller 时通常有两个用法,以下:框架
// 用法1:阻塞式,当爬虫线程都结束后才会执行这行之后的代码
controller.start(factory, numberOfCrawlers);
复制代码
// 用法2:非阻塞式,在 start 之后, waitUntilFinish 之前的代码都会马上执行,在 waitUntilFinish 处阻塞
controller.startNonBlocking(factory, numberOfCrawlers);
// 这中间的代码都会异步执行
controller.waitUntilFinish();
复制代码
这个的源码部分就是本文重点要讨论的 wait 与 notify 的使用。异步
首先,在 CrawlController 中定义了两个这个功能须要的重要变量:ide
/** * Is the crawling of this session finished? */
protected boolean finished;
protected final Object waitingLock = new Object();
复制代码
为了只关注重要内容,其余部分代码我以注释的形式带过。oop
咱们调用 start 方法的入口在这里post
/** * Start the crawling session and wait for it to finish. * * @param crawlerFactory * factory to create crawlers on demand for each thread * @param numberOfCrawlers * the number of concurrent threads that will be contributing in * this crawling session. * @param <T> Your class extending WebCrawler */
public <T extends WebCrawler> void start(WebCrawlerFactory<T> crawlerFactory, int numberOfCrawlers) {
this.start(crawlerFactory, numberOfCrawlers, true);
}
复制代码
它会去调用另外一个有更多参数的 start 方法,多的参数就是 isBlocking
,这个参数表示是否须要阻塞,具体做用在下面这个 start 方法的注释中给出
protected <T extends WebCrawler> void start(final WebCrawlerFactory<T> crawlerFactory, final int numberOfCrawlers, boolean isBlocking) {
// 根据提供的工厂类 crawlerFactory 构造指定数量的线程并使它们开始运行
// 建立一个监控线程 monitorThread 以下
Thread monitorThread = new Thread(new Runnable() {
@Override
public void run() {
try {
synchronized (waitingLock) {
while (true) {
// 设置的监控循环周期
sleep(config.getThreadMonitoringDelaySeconds());
boolean someoneIsWorking = false;
// 第一部分:
// 观察每一个 爬虫线程 是否正常运行,若没有正常运行则采起相应措施
// 第一部分的代码省略,有兴趣能够去 github 看
// 第二部分:
// 查看是否还有正在工做的线程,若没有则准备退出并关闭资源
// 这个部分也是咱们常常看到的 "It looks like no thread is working, waiting for ..." 等 打印日志的所在源码部分
// 在关闭时会调用 notifyAll
if (!someoneIsWorking && shutOnEmpty) {
// 再次确保无线程工做且队列中无 URL 等待爬取
// 释放资源
waitingLock.notifyAll();
// 释放资源
}
}
}
} catch (Throwable e) {
if (config.isHaltOnError()) {
// 发生了某个错误
setError(e);
synchronized (waitingLock) {
// 释放资源
waitingLock.notifyAll();
// 释放资源
}
} else {
logger.error("Unexpected Error", e);
}
}
}
});
monitorThread.start();
// 若是须要阻塞,那么就调用 waitUntilFinish 这个方法,代码执行到这就会阻塞住
if (isBlocking) {
waitUntilFinish();
}
}
复制代码
从代码中能够看到,阻塞的地方在最后几行,也就是监控线程开启后的 waitUntilFinish 方法。
监控线程在监控到线程都运行完后,调用 waitingLock.notifyAll()
从而使这里的阻塞结束,那么这是如何作到的呢,咱们再来看 waitUntilFinish 方法。
这个方法的源码很短,我直接放出来。
/** * Wait until this crawling session finishes. */
public void waitUntilFinish() {
while (!finished) {
synchronized (waitingLock) {
if (config.isHaltOnError()) {
Throwable t = getError();
if (t != null && config.isHaltOnError()) {
if (t instanceof RuntimeException) {
throw (RuntimeException)t;
} else if (t instanceof Error) {
throw (Error)t;
} else {
throw new RuntimeException("error on monitor thread", t);
}
}
}
if (finished) {
return;
}
try {
// 主动让出并等待锁资源
waitingLock.wait();
} catch (InterruptedException e) {
logger.error("Error occurred", e);
}
}
}
}
复制代码
首先,在 start 方法和 waitUntilFinish 方法中都有 synchronized 来修饰关键的代码块,而且争夺的都是同一个锁 waitingLock。这意味着一方执行,就会有另外一方被阻塞。咱们但愿的是 waitUntilFinish 一直被阻塞,直到爬虫线程都执行完(也就是 start 方法中对应的 synchronized 方法块里的内容)后,再让 waitUntilFinish 方法结束。这也就是源码中对这部分的处理,同时也是 wait 和 notify 使用的思想。
再来理一遍源码这块的逻辑:
其中也还有不少细节没有说起,如延时的设置、循环监控周期、资源的释放等等,因为不是本文关注的重点内容,能够本身参照源码理解一下。
不难看出其实本质就是某个线程调用 wait 方法主动让出锁给另外一个线程,而后另外一个线程在执行完任务后调用 notify/notifyAll 来通知它执行完了以让它开始抢占锁。
其中还有一些细节:
为了加深理解,本身动手实现一下 crawler4j 这个机制的究极简易版以下(注意只是实现 wait/notify 机制):
package thread_practice;
public class WaitNotify {
private final Object waitingLock = new Object();
private boolean isFinished = false;
public void start() {
synchronized (waitingLock) {
isFinished = false;
System.out.println("doing sth...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done.");
isFinished = true;
waitingLock.notifyAll();
}
}
public void waitUntilFinish() {
synchronized (waitingLock) {
if (isFinished) return;
try {
waitingLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
WaitNotify wn = new WaitNotify();
new Thread(() -> wn.start()).start();
wn.waitUntilFinish();
System.out.println("continue another thing...");
}
}
复制代码
执行程序5秒以内:
执行程序5秒以后:
能够看到主线程确实是阻塞在了 wn.waitUntilFinish()
这个地方,在5秒以后才继续执行下去。 其逻辑和前面几节个人解释同样,是只提取了核心部分的简化版。
本文结合 crawler4j 中实际使用的例子对实际场景中如何使用 wait 与 notify 进行了介绍与讨论,也根据 crawler4j 中的场景实现了一个简易版功能。线程之间的通讯离不开 wait 与 notify ,固然也不止 wait 与 notify ,我会在之后对这方面作更深刻的研究。
本文有什么错误欢迎联系我指正。
这里只是单个线程通知单个线程任务执行完毕,若是是多个线程通知单个线程的场景怎么处理呢?