Sidekiq 信号处理源码分析

引言

在以前的文章《Sidekiq任务调度流程分析》中,咱们一块儿仔细分析了 Sidekiq 是如何基于多线程完成队列任务处理以及调度的。咱们在以前的分析里,看到了不论是 Sidekiq::Scheduled::Poller 仍是 Sidekiq::Processor 的核心代码里,都会有一个由 @done 实例变量控制的循环体:
<!-- More -->html

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done           # 这是 poller 的循环控制
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done           # 这是咱们常说的 worker 循环控制
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

也就是说,这些 @done 实例变量决定了 poller 线程跟 worker 线程是否循环执行?一旦 @done 被改成 true,那循环体就再也不执行,线程天然也就是退出了。因而,单从这些代码,咱们能够判定, Sidekiq 就是经过设置 @done 的值来通知一个线程安全退出(graceful exit)的。咱们也知道,生产环境中,咱们是经过发送信号的方式来告诉 sidekiq 退出或者进入静默(quiet)状态的,那么,这里的 @done 是怎么跟信号处理联系起来的呢?这些就是今天这篇文章的重点了!git

注意

  1. 今天的分析所参考的 sidekiq 的源码对应版本是 4.2.3;github

  2. 今天所讨论的内容,将主要围绕系统信号处理进行分析,无关细节将不赘述,若有须要,请自行翻阅 sidekiq 源码;数组

  3. 今天的文章跟上篇的《Sidekiq任务调度流程分析》紧密相关,上篇文章介绍的启动过程跟任务调度会帮助这篇文章的理解,若是尚未阅读上篇文章的,建议先阅读后再来阅读这一篇信号处理的文章。安全

你将了解到什么?

  1. Sidekiq 信号处理机制;ruby

  2. 为何重启 Sidekiq 时,USR1 信号(即进入 quiet 模式)须要尽量早,而进程的退出重启须要尽量晚。多线程

从头再来

由于前一篇文章着眼于任务调度,因此略过了其余无关细节,包括信号处理,这篇文章则将镜头对准信号处理,因此让咱们从头再来一遍,只是这一次,咱们只关心与信号处理有关的代码。async

依旧是从 cli.rb 文件开始,它是 Sidekiq 核心代码的生命起点,由于 Sidekiq 命令行启动后,它是第一个被执行的代码,Sidekiq 启动过程当中调用了 Sidekiq::CLI#run 方法:ide

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106
def run
  boot_system
  print_banner

  self_read, self_write = IO.pipe

  %w(INT TERM USR1 USR2 TTIN).each do |sig|
    begin
      trap sig do
        self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

  # ... other codes

  begin
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    logger.info 'Shutting down'
    launcher.stop
    # Explicitly exit so busy Processor threads can't block
    # process shutdown.
    logger.info "Bye!"
    exit(0)
  end

以上的代码就是整个 Sidekiq 最顶层的信号处理的核心代码了,让咱们慢慢分析!
首先,self_read, self_write = IO.pipe 建立了一个模拟管道的 IO 对象,而且同时返回这个 管道的一个写端以及一个读端,经过这两端,就能够实现对管道的读写了。须要注意的是,IO.pipe 建立的读端在读的时候不会自动生成 EOF 符,因此这就要求读时,写端是关闭的,而写时,读端是关闭的,一句话说,就是这样的管道不容许读写端同时打开。关于 IO.pipe 还有挺多细节跟须要注意的点,若是还须要了解,请阅读官方文档学习

上面说的管道本质上只是一个 IO 对象而已,暂时不用纠结太多,让咱们接着往下读:

%w(INT TERM USR1 USR2 TTIN).each do |sig|
  begin
    trap sig do
      self_write.puts(sig)
    end
  rescue ArgumentError
    puts "Signal #{sig} not supported"
  end
end

这段代码就比较有意思了,最外层遍历了一个系统信号的数组,而后逐个信号进行监听(trap,或者叫捕捉?)。让咱们聚焦在 trap 方法的调用跟其 block 上,查阅 Ruby 文档,发现 trapSignal 模块下的一个方法,Signal 主要是处理与系统信号有关的任务,而后 trap 的做用是:

Specifies the handling of signals. The first parameter is a signal name (a string such as “SIGALRM”, “SIGUSR1”, and so on) or a signal number...

因此,前面的那段代码的意思就很容易理解了,Sidekiq 注册了对 INTTERMUSR1USR2以及TTIN等系统信号的处理,而在进程收到这些信号时,就会执行 self_write.puts(sig),也就是将收到的信号经过以前介绍的管道写端 self_write 记录下来。什么?只记录下来,那还得处理啊?!

稍安勿躁,让咱们接着往下分析 Sidekiq::CLI#run 方法末尾的代码:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

看到没有,这里有个循环,循环控制条件里,readable_io = IO.select([self_read]) 是从前面的管道的读端 self_read 阻塞地等待信号的到达。对于 IO.selectRuby 官方文档介绍以下:

Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.

因此这里就是说 Sidekiq 主线程首先负责执行完其余初始化工做,最后阻塞在信号等待以及处理。在其等到新的信号以后,进入上面代码展现的循环体:

signal = readable_io.first[0].gets.strip
handle_signal(signal)

这里语法细节先不深究,咱们看下这两行代码第一行是从前面说的管道中读取信号,而且将信号传递给 handle_signal 方法,让咱们接着往下看 handle_signal 方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153
def handle_signal(sig)
  Sidekiq.logger.debug "Got #{sig} signal"
  case sig
  when 'INT'
    # Handle Ctrl-C in JRuby like MRI
    # http://jira.codehaus.org/browse/JRUBY-4637
    raise Interrupt
  when 'TERM'
    # Heroku sends TERM and then waits 10 seconds for process to exit.
    raise Interrupt
  when 'USR1'
    Sidekiq.logger.info "Received USR1, no longer accepting new work"
    launcher.quiet
  when 'USR2'
    if Sidekiq.options[:logfile]
      Sidekiq.logger.info "Received USR2, reopening log file"
      Sidekiq::Logging.reopen_logs
    end
  when 'TTIN'
    Thread.list.each do |thread|
      Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}"
      if thread.backtrace
        Sidekiq.logger.warn thread.backtrace.join("\n")
      else
        Sidekiq.logger.warn "<no backtrace available>"
      end
    end
  end
end

这里的代码挺长,可是一点都不难理解,我简单解释下就够了。当进程:

  1. 收到 TERM 或者 INT信号时,直接抛出 Interrupt 中断;

  2. 收到 USR1 信号时,则通知 launcher 执行 .quiet 方法,Sidekiq 在这里进入 Quiet 模式(怎么进入?);

  3. 收到 USR2 信号时,从新打开日志;

  4. 收到 TTIN 信号时,打印全部线程当前正在执行的代码列表。

到此,一个信号从收到被存下,到被取出处理的大体过程就是这样的,至于具体的处理方式,咱们下个章节详细展开。如今有一点须要补充的是,上面讲当 Sidekiq 收到 TERM 或者 INT 信号时,都会抛出 Interrupt 中断异常,那这个异常又是如何处理的呢?咱们回过头去看刚才最开始的 Sidekiq::CLI#run 方法末尾的代码:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

原来是 run 方法在处理信号时,声明了 rescue Interrupt,捕捉了 Interrupt 中断异常,而且在异常处理时打印必要日志,同时执行 launcher.stop 通知各个线程中止工做,最后调用 exit 方法强制退出进程,到此,一个 Sidekiq 进程就完全退出了。
可是问题又来了,信号处理的大体过程我是知道了,可是具体的 launcher.quietlauncher.stop 都干了些什么呢?

Sidekiq::Launcher#quiet 源码探索

老规矩,先上代码:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end

代码只有短短三行。 Launcher 对象首先设置本身的实例变量 @done 的值为 true,接着执行 @manager.quiet 以及 @poller.terminate。看方法命名上理解,应该是 Luancher 对象又将 quiet 的消息传递给了 @managerSidekiq::Manager 对象,同时通知 @pollerSidekiq::Scheduled::Poller 对象结束工做。那究竟是不是真的这样呢?让咱们继续深挖!

Sidekiq::Manager#quiet

让咱们来看看 Sidekiq::Manager#quiet 方法的代码

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58
def quiet
  return if @done
  @done = true

  logger.info { "Terminating quiet workers" }
  @workers.each { |x| x.terminate }
  fire_event(:quiet, true)
end

上面的代码也很短,首先将 Sidekiq::Manager 对象自身的 @done 实例变量的值设置为 true,接着对其所管理的每个 worker,都发出一个 terminate 消息。让咱们接着往下看 worker 对象(Sidekiq::Processor 对象)的 #terminate 方法定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46
def terminate(wait=false)
  @done = true
  return if !@thread
  @thread.value if wait
end

这里的代码依然保持了精短的特色!跟上一层逻辑同样,worker 在处理 terminate 时,一样设置本身的 @done 实例变量为 true 后返回,可是,若是其参数 waittrue,则会保持主线程等待,直到 @thread 线程退出(@thread.value 至关于执行 @thread.join而且返回线程的返回值,可参考 Ruby 文档)。

那么,这里就要问了,worker 设置 @done 为 true 是要干吗?这里好像也没有作什么特别的事啊?!勿急,还记得上篇文章介绍 worker 运行时的核心代码吗?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

看到了吧,@done 变量但是一个重要的开关,当 @donefalse 时,worker 一直周而复始地从队列中取任务而且老老实实干活;而当 @donetrue 时,worker 在处理完当前的任务以后,便再也不执行新的任务,执行 @msg.processor_stopped(self) 通知 worker 管理器本身已经退出工做,最终 #run 方法返回。因为 #run 方法是在独立线程里执行的,因此当 #run 方法返回时,其所在的线程天然也就退出了。

那关于 worker 的 quiet 模式进入过程就是这么简单,经过一个共享变量 @done 便实现了对工做线程的控制。

Sidekiq::Scheduled::Poller#terminate

前面说到 Sidekiq::Launcher#quiet 执行时,先将消息传递给了 worker 管理器,随后执行了 @poller.terminate,那咱们来看看 #terminate 方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61
def terminate
  @done = true
  if @thread
    t = @thread
    @thread = nil
    @sleeper << 0
    t.value
  end
end

又是如此简短的代码。poller 退出的逻辑跟 worker 退出的逻辑很是一致,都是一样先设置本身的 @done 实例变量的值为 true,接着等待线程 @thread 退出,最后 poller 返回。

那么,poller 的 @done 是否是也是用来控制线程退出呢?答案是确定的!

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end

还记得上面这段代码吗? poller 在每次将定时任务压回任务队列以后,等待必定时间,而后从新检查 @done 的值,若是为 true,则 poller 直接返回退出,由于 #start 方法里的循环体在新线程中执行,当循环结束时,线程天然也退出了。

小结

  1. 当 Sidekiq 收到 USR1 系统信号时,Sidekiq 主线程向 @launcher 发送 quiet 消息,@launcher 又将消息传递给 @manager ,同时向 @poller 发出 terminate 消息;

  2. @manager 在收到 quiet 消息时,逐一对运行中的 worker 发送 terminate 消息,worker 收到消息后,设置本身的 @donetrue,标识再也不处理新任务,当前任务处理完成后退出线程;

  3. @poller 在收到 terminate 消息后,也是设置本身的 @donetrue,在本次任务执行完毕后,线程也退出;

  4. Sidekiq 进入 quiet 模式以后,全部未处理任务以及新任务都再也不处理,直到 sidekiq 的下一次重启。

Sidekiq::Launcher#stop 源码探索

前面介绍的是 Sidekiq 进入 quiet 模式的过程,那 Sidekiq 的中止过程又是怎样的呢?

让咱们从 Sidekiq::Launcher#stop 方法开始寻找答案:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56
def stop
  deadline = Time.now + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end

首先,Sidekiq::Launcher 对象设定了一个强制退出的 deadline,时间是以当前时间加上配置的 timeout,这个时间默认是 8 秒

接着,设定对象自己的 @done 变量的值为 true,而后分别对 @manager@poller 发送 quietterminate 消息,这个过程就是咱们上面说的 Sidekiq::Launcher#quiet 的过程,因此,这里的代码主要是 Sidekiq 要确保退出前已经通知各个线程准备退出。

接下来的代码就比较重要了,咱们先看这一行:

@manager.stop(deadline)

在通知完 @manager 进入 quiet 模式以后,launcher 向 @manager 发送了 stop 消息,而且同时传递了 deadline 参数。让咱们接着继续往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5

def stop(deadline)
  quiet
  fire_event(:shutdown, true)

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { "Pausing to allow workers to finish..." }
  remaining = deadline - Time.now
  while remaining > PAUSE_TIME
    return if @workers.empty?
    sleep PAUSE_TIME
    remaining = deadline - Time.now
  end
  return if @workers.empty?

  hard_shutdown
end

上面的代码,manager 首先调用了自身的 quiet 方法(这里就真的画蛇添足了,由于外层的 launcher 已经调用过一次了),而后 manager 执行 sleep 系统调用进入休眠,持续时间为 0.5 秒,休眠结束后检查全部 worker 是否已经都退出,若是退出,则直接返回,任务提早结束;若是仍有 worker 未退出,则检查当前时间是否接近强制退出的 deadline,若是不是,则重复“检查全部 worker 退出 - 休眠” 的过程,直到 deadline 来临,或者 worker 线程都已经所有退出。若是最后到达 deadline,仍有 worker 线程未退出,则最后执行 hard_shutdown

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135
def hard_shutdown
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.size > 0
    jobs = cleanup.map {|p| p.job }.compact

    # ... other codes

    strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    strategy.bulk_requeue(jobs, @options)
  end

  cleanup.each do |processor|
    processor.kill
  end
end

这里 hard_shutdown 方法在执行时,首先克隆了当前仍未退出的 @workers 列表,接着获取每一个 worker 当前正在处理的任务,将这些正在执行中的任务数据经过 strategy.bulk_requeue(jobs, @options) 从新写回队列,而最后对每个 worker 发送 kill 消息:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58
def kill(wait=false)
  @done = true
  return if !@thread

  @thread.raise ::Sidekiq::Shutdown
  @thread.value if wait
end

worker 在收到 kill 消息时,首先设置本身的 @donetrue,最后向 worker 所关联的线程抛出 ::Sidekiq::Shutdown 异常。让咱们看看 worker 的线程又是如何处理异常的:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

又回到 worker 的 run 方法这里,能够看到,run 方法捕捉了 Sidekiq::Shutdown 异常,而且在处理异常时,只是执行 @mgr.processor_stopped(self),通知 manager 本身已经退出,因为已经跳出正常流程,worker 的 run 方法返回,线程也所以得以退出。至此,worker 也都正常退出了。

小结

  1. launcher 在执行退出时,首先按照 quiet 的流程先通知各个线程准备退出;

  2. 接着 launcher 向 manager 下达 stop 指令,而且给出最后期限(deadline);

  3. manager 在给定的限时内,尽量等待全部 worker 执行完本身退出,对于到达限时仍未退出的 worker,manager 备份了每一个 worker 的当前任务,从新加入队列,确保任务至少完整执行一次,而后经过向线程抛出异常的方式,迫使 worker 的线程被动退出。

总结

  1. Sidekiq 简单高效利用了系统信号,而且有比较清晰明了的信号处理过程;

  2. Sidekiq 在信号处理的过程当中,各个组件协调颇有条理,消息逐级传递,并且对被强制中止的任务也有备份方案;

  3. 咱们能够从 Sidekiq 的系统信号处理机制上借鉴很多东西,好比经常使用系统信号的分类处理等;

  4. 对于多线程的控制,经过共享变量以及异常的方式作到 graceful 以及 hard 两种方式的退出处理。

  5. 还有不少,一百我的心中有一百个哈姆莱特,一样一份代码,不一样的人学习阅读,确定收获不一样,你能够在评论区留下你的感悟,跟看到这篇文章的人一块儿分享!

问题思考

  1. 为了尽量确保全部 Sidekiq 的任务可以正常主动退出,因此在部署脚本中,都会尽量早地让 Sidekiq 进入 quiet 模式,可是 Sidekiq 的 quiet 是不可逆的,因此一旦部署脚本中途失败,Sidekiq 得不到重启,将会一直保持 quiet 状态,若是长时间未重启,任务就会积压。因此,通常我都会在部署脚本中,额外捕捉部署脚本失败异常,而后主动执行 sidekiq 的重启。若是你的部署脚本中有涉及 Sidekiq 的,必定要注意检查部署失败是否会影响 Sidekiq 的状态

  2. 虽然 Sidekiq 在强制退出当前长时间未退出的任务时,会将 job 的数据写回队列,等待重启后从新执行,那么这里就有个细节须要注意了,就是你的 job 必须是幂等的,不然就不能容许从新执行了。因此,请注意,若是你有须要长时间运行的 job,请注意检查其幂等性

好了,今天就写到这吧!仍然挺长一篇,啰嗦了。感谢看到这里!

相关文章
相关标签/搜索