服务如何优雅关闭

背景

不少时候服务都有平滑退出的需求,例如RPC服务在中止以后须要从注册服务摘除节点、从消息队列已经消费的消息须要正常处理完成等。通常地咱们但愿能让服务在退出前能执行完当前正在执行的任务,这个时候就须要咱们在JVM关闭的时候运行一些清理现场的代码。java

方案

ShutdownHook

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,容许用户注册一个JVM关闭的钩子。这个钩子能够在如下几种场景被调用:ide

  • 程序正常退出;
  • 使用System.exit();
  • 终端使用Ctrl+C触发的终端;
  • 系统关闭;
  • 使用kill pid命令干掉进程;

通常地发布系统会经过kill命令来中止服务。这个时候服务能够接收到关闭信号并执行钩子程序进行清理工做。测试

场景示例

假设如下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当咱们中止服务的时候,但愿队列的消息都能正常处理完成,代码示例以下:spa

/** * 服务关闭测试 */
public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压状况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("try close...");
            // 先关闭生产
            producerTask.setStopped();
            // 再关闭消费
            consumeTask.setStopped();
            try {
                System.out.println("close wait...");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
            System.out.println("close finished...");
        }));
    }
}
复制代码

执行结果以下所示,能够看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。线程

钩子示例结果

潜在问题

在使用ShutdownHook的时候,咱们每每控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,倘若是独立注册钩子,在更复杂的项目里面是否是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,咱们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),而后就会偶尔出现服务没法关闭的问题。缘由正是线程池先被关闭,kafka队列却还在消费消息,致使消费线程一直在等待。code

Signal

Java同时提供了signal信号机制,咱们的服务也能够接收到关闭信号。cdn

使用Signal机制有如下缘由:blog

  • ShutdownHook执行顺序没法保障,第三方组件也可能注册,致使业务自定义的退出流程依赖的资源会被提早关闭和清理;
  • Signal是非公开API,第三方组件基本不多使用,咱们能够在内部托管服务关闭的执行顺序;
  • 在完成清理工做后能够执行exit调用,保证资源清理不会影响ShutdownHook的退出清理逻辑;

这里核心的缘由仍是但愿能彻底保证服务关闭的顺序,避免出现问题。咱们在服务内部按顺序维护关闭任务,上述代码调整后以下所示:接口

public class TermHelper {

    private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
    private static AtomicBoolean stopping = new AtomicBoolean(false);
    private static AtomicBoolean registeredHolder = new AtomicBoolean(false);

    private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();

    private static void tryRegisterOnlyOnce() {
        boolean previousRegistered = registeredHolder.getAndSet(true);
        if (!previousRegistered) {
            registerTermSignal();
        }
    }

    private static void registerTermSignal() {
        Signal.handle(new Signal("TERM"), signal -> {
            boolean previous = signalTriggered.getAndSet(true);
            if (previous) {
                System.out.println("Term has been triggered.");
                return;
            }
            termAndExit();
        });
    }

    public static void addTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addLast(runnable);
    }

    public static void addFirstTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addFirst(runnable);
    }

    private static void termAndExit() {
        try {
            Thread current = Thread.currentThread();
            current.setName(current.getName() + "(退出线程)");
            System.out.println("do term cleanup....");
            doTerm();
            System.out.println("exit success.");
            System.exit(0);
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void doTerm() {
        boolean previousStopping = stopping.getAndSet(true);
        if (previousStopping) {
            System.out.println("Term routine already running, wait until done!");
            return;
        }
        for (Runnable runnable : terms) {
            try {
                System.out.println("execute term runnable : " + runnable);
                runnable.run();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
复制代码

TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。咱们也能够在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。队列

public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压状况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        TermHelper.addFirstTerm(() -> {
            // 关闭生产
            producerTask.setStopped();
        });

        TermHelper.addTerm(() -> {
            // 再关闭消费
            consumeTask.setStopped();
        });

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("shut down hook...");
        }));
    }
}
复制代码

执行结果以下所示。须要注意的是咱们只注册了TERM信号,因此须要经过kill -TERM的方式关闭服务。从图中能够看到咱们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。

image-20190407212611347

小结

若须要平滑中止服务,咱们通常能够经过ShutdownHook和Signal来实现。ShutdownHook通常比较难保证关闭任务的执行顺序,这个时候能够考虑使用Signal机制来彻底托管咱们关闭服务的执行顺序。

相关文章
相关标签/搜索