不少时候服务都有平滑退出的需求,例如RPC服务在中止以后须要从注册服务摘除节点、从消息队列已经消费的消息须要正常处理完成等。通常地咱们但愿能让服务在退出前能执行完当前正在执行的任务,这个时候就须要咱们在JVM关闭的时候运行一些清理现场的代码。java
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,容许用户注册一个JVM关闭的钩子。这个钩子能够在如下几种场景被调用:ide
通常地发布系统会经过kill命令来中止服务。这个时候服务能够接收到关闭信号并执行钩子程序进行清理工做。测试
假设如下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当咱们中止服务的时候,但愿队列的消息都能正常处理完成,代码示例以下:spa
/** * 服务关闭测试 */
public class ShutDownTest {
private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
private static AtomicLong taskId = new AtomicLong(0);
// 生产任务
private static class ProduceTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get()) {
long element = taskId.incrementAndGet();
queue.add(element);
System.out.println("add element : " + element);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop producer.");
}
}
// 消费任务
private static class ConsumeTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get() || queue.size() > 0) {
try {
long element = queue.take();
System.out.println("consume element : " + element);
doWork();
} catch (InterruptedException e) {
}
}
}
private void doWork() {
try {
// 消费速度比生产速度稍慢,模拟积压状况
Thread.sleep(60);
} catch (InterruptedException e) {
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop consumer.");
}
}
public static void main(String[] args) {
final ProduceTask producerTask = new ProduceTask();
final Thread producerThread = new Thread(producerTask);
final ConsumeTask consumeTask = new ConsumeTask();
Thread consumeThread = new Thread(consumeTask);
// 先启动消费
consumeThread.start();
// 再启动生产
producerThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("try close...");
// 先关闭生产
producerTask.setStopped();
// 再关闭消费
consumeTask.setStopped();
try {
System.out.println("close wait...");
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("close finished...");
}));
}
}
复制代码
执行结果以下所示,能够看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。线程
在使用ShutdownHook的时候,咱们每每控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,倘若是独立注册钩子,在更复杂的项目里面是否是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,咱们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),而后就会偶尔出现服务没法关闭的问题。缘由正是线程池先被关闭,kafka队列却还在消费消息,致使消费线程一直在等待。code
Java同时提供了signal信号机制,咱们的服务也能够接收到关闭信号。cdn
使用Signal机制有如下缘由:blog
这里核心的缘由仍是但愿能彻底保证服务关闭的顺序,避免出现问题。咱们在服务内部按顺序维护关闭任务,上述代码调整后以下所示:接口
public class TermHelper {
private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
private static AtomicBoolean stopping = new AtomicBoolean(false);
private static AtomicBoolean registeredHolder = new AtomicBoolean(false);
private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();
private static void tryRegisterOnlyOnce() {
boolean previousRegistered = registeredHolder.getAndSet(true);
if (!previousRegistered) {
registerTermSignal();
}
}
private static void registerTermSignal() {
Signal.handle(new Signal("TERM"), signal -> {
boolean previous = signalTriggered.getAndSet(true);
if (previous) {
System.out.println("Term has been triggered.");
return;
}
termAndExit();
});
}
public static void addTerm(Runnable runnable) {
tryRegisterOnlyOnce();
terms.addLast(runnable);
}
public static void addFirstTerm(Runnable runnable) {
tryRegisterOnlyOnce();
terms.addFirst(runnable);
}
private static void termAndExit() {
try {
Thread current = Thread.currentThread();
current.setName(current.getName() + "(退出线程)");
System.out.println("do term cleanup....");
doTerm();
System.out.println("exit success.");
System.exit(0);
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
}
public static void doTerm() {
boolean previousStopping = stopping.getAndSet(true);
if (previousStopping) {
System.out.println("Term routine already running, wait until done!");
return;
}
for (Runnable runnable : terms) {
try {
System.out.println("execute term runnable : " + runnable);
runnable.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。咱们也能够在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。队列
public class ShutDownTest {
private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
private static AtomicLong taskId = new AtomicLong(0);
// 生产任务
private static class ProduceTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get()) {
long element = taskId.incrementAndGet();
queue.add(element);
System.out.println("add element : " + element);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop producer.");
}
}
// 消费任务
private static class ConsumeTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get() || queue.size() > 0) {
try {
long element = queue.take();
System.out.println("consume element : " + element);
doWork();
} catch (InterruptedException e) {
}
}
}
private void doWork() {
try {
// 消费速度比生产速度稍慢,模拟积压状况
Thread.sleep(60);
} catch (InterruptedException e) {
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop consumer.");
}
}
public static void main(String[] args) {
final ProduceTask producerTask = new ProduceTask();
final Thread producerThread = new Thread(producerTask);
final ConsumeTask consumeTask = new ConsumeTask();
Thread consumeThread = new Thread(consumeTask);
// 先启动消费
consumeThread.start();
// 再启动生产
producerThread.start();
TermHelper.addFirstTerm(() -> {
// 关闭生产
producerTask.setStopped();
});
TermHelper.addTerm(() -> {
// 再关闭消费
consumeTask.setStopped();
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("shut down hook...");
}));
}
}
复制代码
执行结果以下所示。须要注意的是咱们只注册了TERM信号,因此须要经过kill -TERM的方式关闭服务。从图中能够看到咱们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。
若须要平滑中止服务,咱们通常能够经过ShutdownHook和Signal来实现。ShutdownHook通常比较难保证关闭任务的执行顺序,这个时候能够考虑使用Signal机制来彻底托管咱们关闭服务的执行顺序。