Kafka技术内幕样章 层级时间轮

本文首发地址:http://zqhxuyuan.github.io/2016/05/13/2016-05-13-Kafka-Book-Sample-TimingWheel/java

3.4.4 定时器Timer

那么Kafka的Timer定时器是如何存储DelayedOperation,又是如何在有任务超时的时候能准确地轮询出来。在Java中有多种方案能够作到任务的延迟执行,好比java.util.Timer和TimerTask的调度,或者DelayedQueue和实现Delayed接口的线程。但这些对于Kafka这种动辄成千上万个请求的分布式系统而言都过于重量级,因此Kafka的Timer专门设计了TimingWheel这个数据结构来存储大量的处理请求,不过它的底层仍是基于DelayedQueue实现的。 git

被放到延迟队列的每一个元素必须实现Delayed接口,原本能够直接将DelayedOperation放入队列中(任务失效的时候是一个一个弹出),不过由于DelayedOperation数量级太大了,能够将多个DelayedOperation组成一个TimerTaskList链表(在同一个列表中的全部任务的失效时间都很相近,但不必定都相等),以TimerTaskList做为队列的元素,因此失效时间会被设置到TimerTaskList上,当失效的时候,整个列表中的全部任务都会一块儿失效。github

1. 定时任务链表和条目

2. TimingWheel时间轮

Purgatory将任务添加到Timer定时器,而且会在Reaper线程中调用advanceClock不断地移动内部的时钟,使得超时的任务能够被取出来执行。任务加入到TimingWheel中须要首先被包装成TimerTaskEntry,而后TimingWheel会根据TimerTaskEntry的失效时间加入到某个TimerTaskList中(TimingWheel的某个bucket)。当TimerTaskList由于超时被轮询出来并不必定表明里面全部的TimerTaskEntry必定就超时,因此对于没有超时的TimerTaskEntry须要从新加入到TimingWheel新的TimerTaskList中,对于超时的TimerTaskEntry则当即执行任务。不过timingWheel.add添加任务时并不须要先判断有没有超时而后再作决定,而是无论三七二十一,先尝试加入TimerTaskEntry,若是添加成功,那很好;若是没有添加成功,说明这个任务要么已经被取消了,要么超时了。 apache

添加不成功有两种状况,1)被其余线程完成后任务会被取消,这样保证了只有最早完成的那个线程只会调用一次完成的方法,其余线程就再也不须要执行这个任务了。2)任务超时了,但尚未被其余线程完成即尚未被取消,当前线程就应该当即执行任务。数据结构

class Timer(taskExecutor:ExecutorService, tickMs:Long=1,wheelSize:Int=20, 
      startMs: Long = System.currentTimeMillis) {
  val delayQueue = new DelayQueue[TimerTaskList]() //延迟队列,按照失效时间排序
  val taskCounter = new AtomicInteger(0) //内存级别的原子共享变量,全部时间轮同一个计数器
  val timingWheel=new TimingWheel(tickMs,wheelSize,startMs,taskCounter,delayQueue)

  def add(timerTask: TimerTask) = { //1.DelayedOperation是一个TimerTask
    addTimerTaskEntry(new TimerTaskEntry(timerTask)) //2.被包装成定时任务条目
  }
  val reinsert=(entry:TimerTaskEntry) => addTimerTaskEntry(entry)//高阶函数

  //add和reinsert都会将TimerTaskEntry加入到时间轮,后者使用已有的TimerTaskEntry
  def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry) {
    val addSuccess = timingWheel.add(timerTaskEntry) //3.添加到时间轮中
    if (!addSuccess) { //添加不成功,要么被取消,要么超时了
      if (!timerTaskEntry.cancelled) //尚未被取消,那就是超时了
        taskExecutor.submit(timerTaskEntry.timerTask) //执行条目里的定时任务
    }
  }

  def advanceClock(timeoutMs: Long): Boolean = { //timeout是轮询的最长等待时间
    var bucket=delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)//没有到超时不会被轮询出
    if (bucket != null) { //从延迟队列轮询出存储的TimerTaskList
      while (bucket != null) { //一次可能会轮询出多个元素,当并不必定是延迟队列全部元素
        timingWheel.advanceClock(bucket.getExpiration())
        bucket.flush(reinsert) //从新插入,函数的entry参数只有真正调用flush方法才能知道
        bucket=delayQueue.poll() //当即再轮询一次(不等待),直到poll出来没有东西了才中止
      }
    }
  }
}

Timer使用TimingWheel时间轮来管理延迟的等待超时的请求,TimingWheel时间轮是一个存储定时任务的数据结构:tickMs表示指针每隔1ms tick一次,wheelSize=20表示走完一圈要tick 20次,因此走完一圈总共要花费20*1ms=20ms,若是tickMs=1000ms,wheelSize=60,就和时钟里秒针的滴答彻底同样了。taskCounter表示请求数量,若是请求完成则计数器的值会减小,delayQueue是延迟队列用来存储定时任务。能够把Timer看作是定时器线程即模拟秒针每秒钟走一次这个动做,而TimingWheel则负责在秒针tick一次以后将超时的任务完成掉。图3-73举例现实世界的时钟和闹钟(指定时间点)/计时器(多长时间后),假设设置了一个计时器任务要在30秒后离开电脑休息一下,当启动计时器时,时间一秒钟一秒钟地流逝,要执行的任务也渐渐临近,剩余的时间愈来愈少,当计时器中止时,时间已经走了30秒了,以前设置的任务就应该被执行。TimingWheel的工做原理和计时器是相似的,它容许在不一样时刻加入不一样计时器,并且对同一个时间点,也容许多个计时器同时触发执行,好比有多个任务都要在30秒后同时执行。 分布式

3-73 mi timer
图3-73 现实世界的时钟/计时器示例函数

private[timer] class TimingWheel(tickMs:Long,wheelSize:Int,startMs:Long, 
    taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  val interval = tickMs * wheelSize
  val buckets = Array.tabulate[TimerTaskList](wheelSize) {
      _ => new TimerTaskList(taskCounter) } //每一个List共享taskCount计数器
  var currentTime = startMs - (startMs % tickMs) 
  @volatile var overflowWheel: TimingWheel = null

  def addOverflowWheel(): Unit = {
    if (overflowWheel == null) { //建立父级别的时间轮
      overflowWheel = new TimingWheel(
        tickMs = interval, //低级别的整个范围做为父级别一个tick
        wheelSize = wheelSize, //bucket的数量不变
        startMs = currentTime, //当前时间经过advanceClock会更新
        taskCounter = taskCounter, queue //全局惟一的计数器和延迟队列
      )
    }
  }

  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs
    if (timerTaskEntry.cancelled) { //被其余线程取消了(执行任务时会取消)
      false
    } else if (expiration < currentTime + tickMs) { //已经超时了
      false
    } else if (expiration < currentTime + interval) {
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry) //把任务根据失效时间点放到对应的bucket中
      //设置bucket的失效时间点,而后把bucket加入队列中
      if (bucket.setExpiration(virtualId * tickMs)) queue.offer(bucket)
      true
    } else {
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      currentTime = timeMs - (timeMs % tickMs)
      if (overflowWheel != null) 
        overflowWheel.advanceClock(currentTime)
    }
  }
}

举例外部的Purgatory添加任务到定时器中,而后经过Reaper线程的advanceClock移动时钟的调用顺序,假设当前时间currentTime是0s,时间轮的tickMs=1000ms=1s,时间轮大小=8,整个时间轮的范围interval=1s*8=8s,Reaper线程循环时轮询一次队列最长timeoutMs=200ms,添加的四个任务的超时时间分别是[A=0s,B=1s,C=1s,D=3s]。添加任务时,任务A=0s<currentTime+tickMs=0s+1s=1s,因此任务A添加失败,执行executor.submit,表示任务A已经超时了;任务B/C/D知足1s=currentTime+tickMs<=expiration<currentTime+interval=8s,因此能够成功加入队列,不会执行executor.submit,表示它们都尚未到执行的时间点。B/C会加入bucket=1%8=1,D加入的bucket=3%8=3。而后Reaper线程开始调用advanceClock,当前时间=0s,在timeout=200ms内并不会从队列中轮询出来任何元素,由于即便200ms过去了,当前时间是200ms,而队列中最早能够被弹出来的都是1s,因此Reaper线程继续调用了屡次轮询方法:200ms,400ms,600ms,800ms。 性能

在800ms时间点调用轮询时,timeoutMs=200ms,这时候能够终于把bucket1轮询出来(由于800ms时间点+200ms时间间隔=1000ms=1s,而恰好队列中存在延迟时间=1s的bucket),这个bucket1中有两个任务B/C,它们的失效时间都是1s,并且bucket级别的expiration=1s(加入任务时同时肯定bucket的失效时间),首先经过advanceClock更新currentTime=1s。spa

bucket.flush会尝试将任务B/C从新加入队列中,因为此时currentTime已经被更新为1s,B/C的超时时间=1s<currentTime+tickMs=1s+1s=2s,因此此时添加到队列会失败,因而和最开始的A相似执行executor.submit,也表示任务B/C在当前时间点=800ms时刻的轮询过程当中超时了。因为Reaper线程还有其余工做因此即便每次轮询的timeout=200ms,也不必定说每次发生轮询的时间点就是[200m,400ms,600ms]这么恰好,假设轮询的时间点是900ms,那么不用等timeout=200ms只过了100ms就能够把失效时间=1s的bucket轮询出来。图3-74示例了Timer调用TimingWheel添加任务和时钟移动的调用过程,这里为了简单起见,尚未考虑overflowWheel的场景。 线程

3-74 timer and wheel
图3-74 Timer和TimingWheel的调用示例(没有二级时间轮)

失效时间和bucket选择

任务的失效时间是一个肯定的时间点,因此无论当前时间是什么,即便失效时间相同的两个任务在不一样时间点加入队列,它们也会被放入同一个bucket中。固然根据任务的失效时间选择不一样的bucket还跟tickMs以及时间轮的大小有关,时间轮的expiration范围expiration=[currentTime+tickMs,currentTime+interval)。图3-75中示例了在相同时间轮大小下三种不一样的tickMs,当tickMs=1时,每一个bucket中任务的失效时间只有一个值,当tickMs=2时有两种可能,当tickMs=8时就有8种可能,好比失效时间在120-127范围内的任务都会被分配到bucket7中。即便是相同失效时间若是tickMs不一样也会被放入不一样的bucket,好比任务失效时间=103在tickMs=1时分配到bucket7,在tickMs=2时分配到bucket3。

3-75 tickMs
图3-75 相同时间轮,不一样tickMs

由tickMs和时间轮的大小决定了这个时间轮全部任务失效时间的一个范围,若是超过这个范围,则不容许加入,图3-76中当前时间等于100时,时间轮的范围=[101..107],当前时间等于101时,范围=[102..108]以此类推。当前时间所在的bucket其实是没有任务的,由于任务的失效时间若是和当前时间相等说明任务已经失效了,不该该放入队列中。

3-76 wheel tick
图3-76 时钟tick影响了时间轮的取值范围

任务的失效时间和当前时间相等指的是彻底相等,好比tickMs=1s,时间轮大小=60,当前时间等于12:00:00,某个任务的超时时间是12:01:00。时钟tick时每隔一秒走动一次:[12:00:01,..12:00:59,12:01:00],在12:01:00这一刻任务就超时了,不是12:01:00到下一次tick=12:01:01的一半12:01:00.500,也不是过了12:01:00后的下一次tick=12:01:01才超时,当刚恰好进入12:01:00.000时任务就超时了!好比你定了一个12:01:00的任务运行,你固然但愿在那个时间点分绝不差地精准地执行任务,多一秒少一秒都不行!

层级时间轮

只有一个时间轮虽然在时间移动时能够重用旧的bucket来保存失效时间更日后的任务,可是因为时间轮所容许的范围就那么大,超过这个范围的失效时间就没法很好地存储了。仍是以tickMs=1s,时间轮大小=60为例,若是当前时间是12:00:00,你没法设置12:01:01的任务,更谈不上12:02:00以及失效时间更加日后的任务了。在《嵌入式系统的实时概念》第十一章提到使用一个外部的event flow buffer来暂时存储超过interval的事件,不过更好的方式是使用层级的时间轮。层级时间轮中假设时间轮大小都不变,可是tickMs则是不断递增,假设Level0的tickMs=1s,则Level1的tickMs=1s*60=60s,Level2的tickMs=60s*60=3600s以此类推。每一层的tickMs表示的是在当前时间轮中移动一格的粒度/单位,Level0=1s,Level1=60s,Level2=3600s。能够用钟表的秒钟,分针,时针的移动来理解这三个时间轮:秒针走动一格须要花费一秒,分针走动一格花费60秒,时针走动一格花费3600秒。并且更高层级的tickMs等于低一层的整个时间轮范围,好比Level0的interval=1s*60=60s,恰好做为Level1的tickMs,Level1的interval=60s*60=3600s,也做为Level2的tickMs。也就是说Level1的一格等于Level0走完一圈,Level2的一格等于Level1走完一圈。

那么为何tickMs的单位不一样,假设有几个任务的失效时间分别是[20s,60s,70s,120s,3600s],若是全部时间轮的tickMs都是1s,总共须要3600/60=60个时间轮!既然每一个时间轮的tickMs都相等,跟直接用一个大小等于3600的时间轮是没有任何区别的。而若是使用层级时间轮,总共只须要3个时间轮,20s在Level0的第20个单元格,60s和70s在Level1的第一个单元格内,120s在Level1的第二个单元格上,3600在Level2的第一个单元格上(思考下为何要把60s这种恰好等于当前时间轮范围的任务放在下一个时间轮,而不是当前时间轮上)。

confluent有篇博客详细介绍了时间轮的改进和性能对比,图3-77中当前指针指向任务①所在的bucket,则任务①已经超时了,当发生一次tick以后,任务①已经完全从队列中移除了,tick一次以后当前指针指向了任务②所在的bucket,由于任务②也已经超时了,也就是说tick指针指向哪里,那里就已经超时了,在当前指针所指向的bucket里的任务都应该被取出来执行。

3-77 tick and expire
图3-77 时间轮tick到当前bucket,这个bucket的任务都超时
摘自:http://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels

图3-78有两个时间轮分别是Level0和Level1,在Time0时加入了⑦⑧⑨三个任务,任务⑦在Level0的7-8之间,任务⑧⑨在Level1的8-16之间。这里你可能会认为⑧和⑨应该紧接着Level0的7-8的下一个应该放在Level1的0-8,这样才叫作无缝衔接嘛。不过若是把⑧⑨放在Level1的0-8之间,1)自己就不符合取值范围,由于⑧⑨在0-8以外,而放在8-16之间正好知足⑧⑨的取值范围。2)Level1当前指向了0-8表示这个区间的全部任务都已经超时,若是⑧⑨放在这里,那么它们就都会超时,而此时连任务⑦都还没超时,⑧⑨怎么可能超时呢。3)Level1的0-8这一格子对应了Level0的全部格子,因此Level1指向0-8表示任务在0-8之间的正在超时,不过具体0-8之间的任务则仍是以Level0为准,这就比如在Time8时,Level1指向了8-16,表示8-16之间的任务正在超时,可是具体8-16之间的任务也是以Level0为准。

在Time0以后发生一次Tick后,Level0的指针指向1-2,而Level2的指针没有变化,而Level0的0消失,1添加了9。图3-78中当前时间=Time7时,Level0的指针指向了7-8之间,Level2的指针仍是没有变化,任务⑦超时。再次发生tick以后,指针移动到8-9(这里已经不是0-1了),这时候Level1的指针终于移动了一格从原先的0-8移动到8-16(想象下秒针走了一圈60s,分针才终于挪动了一格)。而Level1原先在8-16之间存在任务⑧⑨,那么是否是说这两个任务同时失效了呢?实际上外界真实时钟走动的粒度只和第一个时间轮Level0的tickMs相等,Level1走动一格只表示当前这一格的全部任务在Level0走完一圈后都会失效,就比如Level1指向0-8时表示Level0中任务时间在0-8之间只有Level0走完一圈才会所有失效。所以须要把Level1的任务⑧⑨从Level1中解除出来,放到更细粒度的Level0中才能真正决定任务何时真正失效。因此在Timer8时,Level1的任务⑧⑨被一一放回Level0的各个bucket中,原先在Level1中挤在同一个单元格里的多个任务被分散在Level0的各个单元格中,这样原先在Level1的各个任务如今就会参照Level0中的tickMs(也就是真实的tickMs)。

3-78 overflow timer
图3-78 层级时间轮的收敛和发散

能够这么理解,在Time0到Time7之间,任务⑧⑨在Level1中蓄势待发,可是由于Level0尚未走完一圈,Level1的指针不会移动,只有Level0走完一圈后,Level1才会移动一次,并把Level1一格的任务按照Level0的tickMs粒度从新划分。Level0表明的永远是真实的时钟移动,超时的任务必定是在Level0中被选中的,在其余Level中的任务在接近超时的时候只会源源不断地进入到Level0中。能够认为除了Level0,其余Level都是虚拟出来的时间轮,这些更高级的时间轮由于tickMs粒度比较大,能够存储数据量更大的任务,可是不具有执行超时任务的能力,当高级别的时间轮发生一次tick后,须要把tick指向的全部任务移动到低级别的时间轮中,从而有机会被放到Level0中真正地执行。

相关文章
相关标签/搜索