在以前的文章《Sidekiq任务调度流程分析》中,咱们一块儿仔细分析了 Sidekiq 是如何基于多线程完成队列任务处理以及调度的。咱们在以前的分析里,看到了不论是 Sidekiq::Scheduled::Poller
仍是 Sidekiq::Processor
的核心代码里,都会有一个由 @done
实例变量控制的循环体:
<!-- More -->html
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done # 这是 poller 的循环控制 enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done # 这是咱们常说的 worker 循环控制 process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
也就是说,这些 @done
实例变量决定了 poller
线程跟 worker
线程是否循环执行?一旦 @done
被改成 true
,那循环体就再也不执行,线程天然也就是退出了。因而,单从这些代码,咱们能够判定, Sidekiq 就是经过设置 @done
的值来通知一个线程安全退出(graceful exit)的。咱们也知道,生产环境中,咱们是经过发送信号的方式来告诉 sidekiq 退出或者进入静默(quiet)状态的,那么,这里的 @done
是怎么跟信号处理联系起来的呢?这些就是今天这篇文章的重点了!git
今天的分析所参考的 sidekiq 的源码对应版本是 4.2.3;github
今天所讨论的内容,将主要围绕系统信号处理进行分析,无关细节将不赘述,若有须要,请自行翻阅 sidekiq 源码;数组
今天的文章跟上篇的《Sidekiq任务调度流程分析》紧密相关,上篇文章介绍的启动过程跟任务调度会帮助这篇文章的理解,若是尚未阅读上篇文章的,建议先阅读后再来阅读这一篇信号处理的文章。安全
Sidekiq 信号处理机制;ruby
为何重启 Sidekiq 时,USR1
信号(即进入 quiet
模式)须要尽量早,而进程的退出重启须要尽量晚。多线程
由于前一篇文章着眼于任务调度,因此略过了其余无关细节,包括信号处理,这篇文章则将镜头对准信号处理,因此让咱们从头再来一遍,只是这一次,咱们只关心与信号处理有关的代码。async
依旧是从 cli.rb
文件开始,它是 Sidekiq 核心代码的生命起点,由于 Sidekiq 命令行启动后,它是第一个被执行的代码,Sidekiq 启动过程当中调用了 Sidekiq::CLI#run
方法:ide
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106 def run boot_system print_banner self_read, self_write = IO.pipe %w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end # ... other codes begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
以上的代码就是整个 Sidekiq 最顶层的信号处理的核心代码了,让咱们慢慢分析!
首先,self_read, self_write = IO.pipe
建立了一个模拟管道的 IO 对象,而且同时返回这个 管道的一个写端以及一个读端,经过这两端,就能够实现对管道的读写了。须要注意的是,IO.pipe
建立的读端在读的时候不会自动生成 EOF
符,因此这就要求读时,写端是关闭的,而写时,读端是关闭的,一句话说,就是这样的管道不容许读写端同时打开。关于 IO.pipe
还有挺多细节跟须要注意的点,若是还须要了解,请阅读官方文档。学习
上面说的管道本质上只是一个 IO 对象而已,暂时不用纠结太多,让咱们接着往下读:
%w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end
这段代码就比较有意思了,最外层遍历了一个系统信号的数组,而后逐个信号进行监听(trap,或者叫捕捉?)。让咱们聚焦在 trap
方法的调用跟其 block 上,查阅 Ruby 文档,发现 trap
是 Signal
模块下的一个方法,Signal
主要是处理与系统信号有关的任务,而后 trap
的做用是:
Specifies the handling of signals. The first parameter is a signal name (a string such as “SIGALRM”, “SIGUSR1”, and so on) or a signal number...
因此,前面的那段代码的意思就很容易理解了,Sidekiq 注册了对 INT
、TERM
、USR1
、USR2
以及TTIN
等系统信号的处理,而在进程收到这些信号时,就会执行 self_write.puts(sig)
,也就是将收到的信号经过以前介绍的管道写端 self_write
记录下来。什么?只记录下来,那还得处理啊?!
稍安勿躁,让咱们接着往下分析 Sidekiq::CLI#run
方法末尾的代码:
begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
看到没有,这里有个循环,循环控制条件里,readable_io = IO.select([self_read])
是从前面的管道的读端 self_read
阻塞地等待信号的到达。对于 IO.select
,Ruby 官方文档介绍以下:
Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.
因此这里就是说 Sidekiq 主线程首先负责执行完其余初始化工做,最后阻塞在信号等待以及处理。在其等到新的信号以后,进入上面代码展现的循环体:
signal = readable_io.first[0].gets.strip handle_signal(signal)
这里语法细节先不深究,咱们看下这两行代码第一行是从前面说的管道中读取信号,而且将信号传递给 handle_signal
方法,让咱们接着往下看 handle_signal
方法的定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153 def handle_signal(sig) Sidekiq.logger.debug "Got #{sig} signal" case sig when 'INT' # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when 'TERM' # Heroku sends TERM and then waits 10 seconds for process to exit. raise Interrupt when 'USR1' Sidekiq.logger.info "Received USR1, no longer accepting new work" launcher.quiet when 'USR2' if Sidekiq.options[:logfile] Sidekiq.logger.info "Received USR2, reopening log file" Sidekiq::Logging.reopen_logs end when 'TTIN' Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "<no backtrace available>" end end end end
这里的代码挺长,可是一点都不难理解,我简单解释下就够了。当进程:
收到 TERM
或者 INT
信号时,直接抛出 Interrupt
中断;
收到 USR1
信号时,则通知 launcher
执行 .quiet
方法,Sidekiq 在这里进入 Quiet 模式(怎么进入?);
收到 USR2
信号时,从新打开日志;
收到 TTIN
信号时,打印全部线程当前正在执行的代码列表。
到此,一个信号从收到被存下,到被取出处理的大体过程就是这样的,至于具体的处理方式,咱们下个章节详细展开。如今有一点须要补充的是,上面讲当 Sidekiq 收到 TERM
或者 INT
信号时,都会抛出 Interrupt
中断异常,那这个异常又是如何处理的呢?咱们回过头去看刚才最开始的 Sidekiq::CLI#run
方法末尾的代码:
begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
原来是 run
方法在处理信号时,声明了 rescue Interrupt
,捕捉了 Interrupt
中断异常,而且在异常处理时打印必要日志,同时执行 launcher.stop
通知各个线程中止工做,最后调用 exit
方法强制退出进程,到此,一个 Sidekiq 进程就完全退出了。
可是问题又来了,信号处理的大体过程我是知道了,可是具体的 launcher.quiet
跟 launcher.stop
都干了些什么呢?
老规矩,先上代码:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36 def quiet @done = true @manager.quiet @poller.terminate end
代码只有短短三行。 Launcher 对象首先设置本身的实例变量 @done
的值为 true
,接着执行 @manager.quiet
以及 @poller.terminate
。看方法命名上理解,应该是 Luancher 对象又将 quiet 的消息传递给了 @manager
即 Sidekiq::Manager
对象,同时通知 @poller
即 Sidekiq::Scheduled::Poller
对象结束工做。那究竟是不是真的这样呢?让咱们继续深挖!
让咱们来看看 Sidekiq::Manager#quiet
方法的代码
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58 def quiet return if @done @done = true logger.info { "Terminating quiet workers" } @workers.each { |x| x.terminate } fire_event(:quiet, true) end
上面的代码也很短,首先将 Sidekiq::Manager
对象自身的 @done
实例变量的值设置为 true
,接着对其所管理的每个 worker,都发出一个 terminate
消息。让咱们接着往下看 worker 对象(Sidekiq::Processor
对象)的 #terminate
方法定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46 def terminate(wait=false) @done = true return if !@thread @thread.value if wait end
这里的代码依然保持了精短的特色!跟上一层逻辑同样,worker 在处理 terminate
时,一样设置本身的 @done
实例变量为 true
后返回,可是,若是其参数 wait
为 true
,则会保持主线程等待,直到 @thread
线程退出(@thread.value
至关于执行 @thread.join
而且返回线程的返回值,可参考 Ruby 文档)。
那么,这里就要问了,worker 设置 @done
为 true 是要干吗?这里好像也没有作什么特别的事啊?!勿急,还记得上篇文章介绍 worker 运行时的核心代码吗?
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
看到了吧,@done
变量但是一个重要的开关,当 @done
为 false
时,worker 一直周而复始地从队列中取任务而且老老实实干活;而当 @done
为 true
时,worker 在处理完当前的任务以后,便再也不执行新的任务,执行 @msg.processor_stopped(self)
通知 worker 管理器本身已经退出工做,最终 #run
方法返回。因为 #run
方法是在独立线程里执行的,因此当 #run
方法返回时,其所在的线程天然也就退出了。
那关于 worker 的 quiet 模式进入过程就是这么简单,经过一个共享变量 @done
便实现了对工做线程的控制。
前面说到 Sidekiq::Launcher#quiet
执行时,先将消息传递给了 worker 管理器,随后执行了 @poller.terminate
,那咱们来看看 #terminate
方法的定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61 def terminate @done = true if @thread t = @thread @thread = nil @sleeper << 0 t.value end end
又是如此简短的代码。poller 退出的逻辑跟 worker 退出的逻辑很是一致,都是一样先设置本身的 @done
实例变量的值为 true
,接着等待线程 @thread
退出,最后 poller 返回。
那么,poller 的 @done
是否是也是用来控制线程退出呢?答案是确定的!
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
还记得上面这段代码吗? poller 在每次将定时任务压回任务队列以后,等待必定时间,而后从新检查 @done
的值,若是为 true
,则 poller 直接返回退出,由于 #start
方法里的循环体在新线程中执行,当循环结束时,线程天然也退出了。
当 Sidekiq 收到 USR1
系统信号时,Sidekiq 主线程向 @launcher
发送 quiet
消息,@launcher
又将消息传递给 @manager
,同时向 @poller
发出 terminate
消息;
@manager
在收到 quiet
消息时,逐一对运行中的 worker 发送 terminate
消息,worker 收到消息后,设置本身的 @done
为 true
,标识再也不处理新任务,当前任务处理完成后退出线程;
@poller
在收到 terminate
消息后,也是设置本身的 @done
为 true
,在本次任务执行完毕后,线程也退出;
Sidekiq 进入 quiet 模式以后,全部未处理任务以及新任务都再也不处理,直到 sidekiq 的下一次重启。
前面介绍的是 Sidekiq 进入 quiet 模式的过程,那 Sidekiq 的中止过程又是怎样的呢?
让咱们从 Sidekiq::Launcher#stop
方法开始寻找答案:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56 def stop deadline = Time.now + @options[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue([], @options) clear_heartbeat end
首先,Sidekiq::Launcher
对象设定了一个强制退出的 deadline
,时间是以当前时间加上配置的 timeout
,这个时间默认是 8 秒。
接着,设定对象自己的 @done
变量的值为 true
,而后分别对 @manager
和 @poller
发送 quiet
和 terminate
消息,这个过程就是咱们上面说的 Sidekiq::Launcher#quiet
的过程,因此,这里的代码主要是 Sidekiq 要确保退出前已经通知各个线程准备退出。
接下来的代码就比较重要了,咱们先看这一行:
@manager.stop(deadline)
在通知完 @manager
进入 quiet 模式以后,launcher 向 @manager
发送了 stop
消息,而且同时传递了 deadline
参数。让咱们接着继续往下看:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83 PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5 def stop(deadline) quiet fire_event(:shutdown, true) # some of the shutdown events can be async, # we don't have any way to know when they're done but # give them a little time to take effect sleep PAUSE_TIME return if @workers.empty? logger.info { "Pausing to allow workers to finish..." } remaining = deadline - Time.now while remaining > PAUSE_TIME return if @workers.empty? sleep PAUSE_TIME remaining = deadline - Time.now end return if @workers.empty? hard_shutdown end
上面的代码,manager 首先调用了自身的 quiet
方法(这里就真的画蛇添足了,由于外层的 launcher 已经调用过一次了),而后 manager 执行 sleep
系统调用进入休眠,持续时间为 0.5 秒,休眠结束后检查全部 worker 是否已经都退出,若是退出,则直接返回,任务提早结束;若是仍有 worker 未退出,则检查当前时间是否接近强制退出的 deadline,若是不是,则重复“检查全部 worker 退出 - 休眠” 的过程,直到 deadline 来临,或者 worker 线程都已经所有退出。若是最后到达 deadline,仍有 worker 线程未退出,则最后执行 hard_shutdown
。
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135 def hard_shutdown cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.size > 0 jobs = cleanup.map {|p| p.job }.compact # ... other codes strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue(jobs, @options) end cleanup.each do |processor| processor.kill end end
这里 hard_shutdown
方法在执行时,首先克隆了当前仍未退出的 @workers
列表,接着获取每一个 worker 当前正在处理的任务,将这些正在执行中的任务数据经过 strategy.bulk_requeue(jobs, @options)
从新写回队列,而最后对每个 worker 发送 kill
消息:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58 def kill(wait=false) @done = true return if !@thread @thread.raise ::Sidekiq::Shutdown @thread.value if wait end
worker 在收到 kill
消息时,首先设置本身的 @done
为 true
,最后向 worker 所关联的线程抛出 ::Sidekiq::Shutdown
异常。让咱们看看 worker 的线程又是如何处理异常的:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
又回到 worker 的 run
方法这里,能够看到,run
方法捕捉了 Sidekiq::Shutdown
异常,而且在处理异常时,只是执行 @mgr.processor_stopped(self)
,通知 manager 本身已经退出,因为已经跳出正常流程,worker 的 run
方法返回,线程也所以得以退出。至此,worker 也都正常退出了。
launcher 在执行退出时,首先按照 quiet 的流程先通知各个线程准备退出;
接着 launcher 向 manager 下达 stop
指令,而且给出最后期限(deadline
);
manager 在给定的限时内,尽量等待全部 worker 执行完本身退出,对于到达限时仍未退出的 worker,manager 备份了每一个 worker 的当前任务,从新加入队列,确保任务至少完整执行一次,而后经过向线程抛出异常的方式,迫使 worker 的线程被动退出。
Sidekiq 简单高效利用了系统信号,而且有比较清晰明了的信号处理过程;
Sidekiq 在信号处理的过程当中,各个组件协调颇有条理,消息逐级传递,并且对被强制中止的任务也有备份方案;
咱们能够从 Sidekiq 的系统信号处理机制上借鉴很多东西,好比经常使用系统信号的分类处理等;
对于多线程的控制,经过共享变量以及异常的方式作到 graceful
以及 hard
两种方式的退出处理。
还有不少,一百我的心中有一百个哈姆莱特,一样一份代码,不一样的人学习阅读,确定收获不一样,你能够在评论区留下你的感悟,跟看到这篇文章的人一块儿分享!
为了尽量确保全部 Sidekiq 的任务可以正常主动退出,因此在部署脚本中,都会尽量早地让 Sidekiq 进入 quiet 模式,可是 Sidekiq 的 quiet 是不可逆的,因此一旦部署脚本中途失败,Sidekiq 得不到重启,将会一直保持 quiet 状态,若是长时间未重启,任务就会积压。因此,通常我都会在部署脚本中,额外捕捉部署脚本失败异常,而后主动执行 sidekiq 的重启。若是你的部署脚本中有涉及 Sidekiq 的,必定要注意检查部署失败是否会影响 Sidekiq 的状态
虽然 Sidekiq 在强制退出当前长时间未退出的任务时,会将 job 的数据写回队列,等待重启后从新执行,那么这里就有个细节须要注意了,就是你的 job 必须是幂等的,不然就不能容许从新执行了。因此,请注意,若是你有须要长时间运行的 job,请注意检查其幂等性。
好了,今天就写到这吧!仍然挺长一篇,啰嗦了。感谢看到这里!