[TOC]java
在技术群讨论到一个有意思的业务需求,能够描述为:git
有一个内部按照优先级进行任务排序的线程池。线程池会优先执行高优先级的任务。随着时间的流逝,线程池内部低优先级的任务的优先级会逐渐晋升变为高优先级,以免被不断新增的高优先级任务阻塞致使饿死。算法
考虑到 JDK 已经为开发者提供了自定义线程池ThreadPoolExecutor
以及优先级队列PriorityBlockingQueue
,二者相结合而且按期调整队列中低优先级任务的优先级再进行resort
将低优先级的任务调整到队列的前头,也能够必定程度上避免被饿死。数组
这种方案的问题在于resort
的消耗比较高,而且还须要从新计算每个任务的优先级。为此,引出咱们下面的设计,但愿使用无锁并发的数据结构存储任务,而且任务支持自动的优先级晋升,保证低优先级的任务最终可以执行而不会被不断增长的高优先级任务饿死。缓存
欢迎加入技术交流群186233599讨论交流,也欢迎关注笔者公众号:风火说。markdown
声明一个数组,按照循环队列的方式使用。每个数组槽位上都挂载一个任务列表。有一个当前指针指向数组中的某一个槽位,该槽位即为当前最高优先级任务插入的槽位。指针数字递增方向优先级依次下降。指针以某种方式沿递增方向移动,由于指针指向的槽位表明最高优先级,所以指针的移动实际上意味着全部槽位的优先级都晋升了。数据结构
那么这里的优先级只能是离散化的整型数字,而且优先级的范围为 0 到 数组长度减 1 。最高优先级为0。多线程
用图形化的方式表达就是以下的状况并发
图中优先级的范围是[0,6],current指针指向的槽位即为最高优先级,current左侧槽位为最低优先级,current右侧槽位为次高优先级。每个槽位上都挂载一个队列,队列中的任务的优先级都相同(后续算法中能够看到会有不一样的优先级混合)。测试
每次取任务时老是从current指针指向的槽位的队列读取任务。当必定时间流逝后,current指针沿着右侧移动一位,此时意味着全部槽位的优先级都被晋升了,除了本来的current指向的槽位,它变为了最低优先级槽位。
因为current指针老是在移动,所以最终会移动到以前低优先级的槽位,此时该槽位下的任务就成了最高优先级任务,被读取执行。这样就避免了在运行过程不断有高优先级任务被加入致使本来的低优先级饿死的状况发生。
根据上面的优先级晋升思路,显然应该有一个数组,其不一样的槽位表明着不一样的优先级。每个槽位上挂载一个 MPMC 类型的队列,用于该优先级下任务的添加和读取。
使用一个当前指针,该指针指向的槽位为最高优先级槽位。
若是只有一个指针,意味着读取任务时,从该指针指向的槽位读取,所以此时指针指向的槽位是最高优先级。而插入任务的时候,须要根据当前指针进行计算。这种模式在优先级晋升时存在并发问题。
当指针从槽位1指向更新到槽位2。此时槽位1可能还存在部分剩余的任务,这部分任务的实际优先级应该是高于槽位2当中的。而若是在这个时候插入最低优先级的任务,可能就会插入到槽位1中。那么槽位1的任务队列实际就混合了最高优先级和最低优先级的任务,没法区分。
为了解决不一样优先级任务在同一个队列中混合的问题,咱们能够在指针移动时,将以前槽位的剩余移动到当前槽位的队列头。这实际上就意味着要求队列是出于双端队列模式。可是由于指针移动和任务移动没法原子化进行,仍是会形成槽位1的队列中最高优先级任务和最低优先级任务混在一块儿的状况。
从实现效果而言,咱们须要的是在指针移动的时候,保证槽位1中剩余的本来高优先级的任务执行完毕后才能去执行槽位2这个“本来的次高优先级,如今的最高优先级“的任务。从效果来看,并不须要必定移动任务,能够经过一种手段,保证槽位1中本来高优先级任务执行完毕后再去执行槽位2的任务便可。
基于这种考量,咱们将一个指针拆分为两个:任务插入指针和任务读取指针。
基于并发读写的考虑,两个指针都是AtomicInteger
类型。两个指针的做用分别为:
任务插入指针和任务读取指针分离的好处在于,任务插入指针的移动意味着不一样槽位优先级的实际晋升。而读取能够依照读取指针指向的槽位上的队列读取任务,直到对应优先级的任务读取完毕后再移动读取指针到下一个槽位。这样一来,保证了按照入列的顺序被公平的处理,也保证了同一个时间单位高优先级的任务优于低优先级任务被处理,也避免了单一指针移动须要的任务拷贝带来的不一样优先级任务污染问题。
插入指针能够按照两种策略移动:
若是选择策略一,须要后台配置一个线程,按照固定时间移动插入指针;若是选择策略二,须要一个全局的AtomicInteger
对象,用于次数断定。
若是选择方案一,可能会存在一种场景,往线程池中投入了大量的同一个优先级的任务,使得某个槽位上的队列长度很长。若是任务处理相对缓存,则任务插入指针可能会被移动屡次。这种移动会使得槽位上队列有了不少不一样优先级的任务。而读取任务时按照优先级逐步去处理,这使得产生了这么多不一样的优先级实际上意义是不大的。
所以采用策略二会更加合适一些。
因为读取任务时是多线程的,所以策略二实现上须要注意的点包括:
AtomicInteger#incrementAndGet
实现任务读取次数累加。若是返回的数字是阈值的倍数,则意味着能够移动任务插入指针。AtomicInteger#incrementAndGet
来移动插入指针。在这里对插入指针移动的并发考量在于,因为读取线程对读取计数使用AtomicInteger#incrementAndGet
方式累加是必然成功,而返回数值是晋升阈值的倍数时必然须要实现插入指针的递增。由于递增的必然性,所以一样使用AtomicInteger#incrementAndGet
方式来实现。
假定系统初始状态,插入和读取指针都指向了槽位1,在槽位1上插入了大量的任务。随着任务的读取,插入指针移动到了槽位2,此时该槽位上插入了一些任务。随着任务的读取,插入指针继续移动,移动过数组的长度后,再次指向了槽位2。假定此时读取指针仍然在槽位1,而若是这个时候插入插入任务。那么实际上槽位2队列中任务应该分为两种:前半部分是上一个轮次插入的任务,后半部分是当前刚插入的任务。
若是读取指针移动到槽位2,应该将前半部分任务执行完毕后就去执行槽位3上的任务,而不是将全部的任务都执行完。所以槽位3上的任务实际优先级应该高于槽位2队列中后半部分的任务。
基于上述状况,问题能够转化为依靠读取指针在读取任务时,如何识别当前队列中不是本轮次要处理的任务进而移动读取指针?
考虑到任务插入指针和任务读取指针自己是有值的,这个值单调递增,实际上能够当作是一种“顺序”概念的表达。所以任务的准备添加时,能够将插入指针的值加上任务的优先级,声明为任务的插入优先级。读取指针在读取任务时,只有当前任务的插入优先级等于读取指针的值,意味着该任务时本轮次读取指针应该要处理的任务。若是读取的任务的插入优先级与读取指针不等时,意味着当前队列不能再读取任务,应该移动读取指针。
经过任务自己的插入优先级避免了不一样轮次的任务在一个队列中被混合致使的优先级混乱。
上个章节提出任务的插入优先级,解决了不一样轮次的任务在同一个队列可能会混合的问题。这个问题的解决引出了读取指针的移动策略:在读取到的任务的插入优先级与读取指针的值不等时意味着须要移动。
可是这里又产生了新的问题:并发移动读取指针的问题。在读取并发的状况下,会遇到一个问题:读取出来的任务的优先级不符合指针,此时要从新放回队列,可是从新放入,就可能和任务的插入混合,形成数据混乱。
有几种可能的解决方式:
Sync
关键字修饰,若是读取任务不符合,则放回,而且移动指针。因为没有读取并发,但仍然可能由于读取的放回和新任务的添加形成数据混乱。策略一并不能完全解决问题,在这里咱们采用策略二的方案。
策略二的引入实际上改变了上面的一个数据结构,也便是数组存储的元素再也不是一个任务队列,而是一个分段队列。而每个分段内部又存储了任务队列,而且分段的队列的任务的插入优先级均是相同的。这意味着分段在建立的时候就具有了插入优先级这个值。分段和分段的插入优先级必然不一样,这个结构就自然的支持了轮次的概念。
分段结构的引入致使了数据结构的变化,这实际上会改变任务插入和任务读取的流程。下文会再来细说具体的实现。分析到这里,读取指针的移动时机就很明白了,在分段内数据耗尽,就意味着某个具体插入优先级的任务都被读取完毕了。
固然,考虑到读写并发的缘由,读取线程发现分段内数据耗尽并不意味着该插入优先级的任务全被读取了,后文会针对并发场景在处理流程上解决。
插入和读取可能在同一个槽位同一个分段上并发。分段的队列自己是支持MPMC的,这并无问题。
可能会出现一种并发异常就是插入线程读取了插入指针的值,而且准备插入数据,可是由于线程调度的缘由,失去了CPU资源,还没有完成数据插入。此时读取线程将槽位内的任务读取完毕后认为没有数据,则移动了读取指针到下一个槽位。在读取指针移动后,插入线程才完成数据的插入。这样致使原本应该是高优先级的任务变成最低优先级槽位上的任务。而当下一轮次读取指针再次指向该槽位时,读取指针获取的到任务的任务优先级又会和读取指针自己的数值冲突。
针对并发的异常场景,有一种常见的解决思路就是二次检查。也就是读取线程在移动任务读取指针后,再次检查下当前分段内是否出现了新的任务,若是有,则协助迁移到下一个槽位上;写入线程在放入任务后,检查是否读取指针移动过,若是有,则协助迁移到下一个槽位上。
然而,读取线程检查分段内的队列是否剩余,写入线程检查读取指针是否移动,这些状态都是在动态变化的,仍然会产生一些其余问题。双重检查通常会引入一个终止状态来来减小可能的变化场景。在这里,咱们为分段引入状态:使用中和终止。一个分段初始化时是使用中状态,当读取线程认为该分段内的任务都被消耗后,则应该更新为终止状态。一旦分段进入终止状态,则被抛弃,不该该再有任务数据添加到该分段中。
经过分段状态,咱们能够将任务区分为终止前添加到分段和终止后添加到分段两类。前者须要被正常读取,后者则须要迁移到其它合适的分段中再被处理。
到这里为止,咱们针对数据结构和其元素属性的变化就完成了。
将数组经过循环队列的方式来表达不一样的优先级。经过任务写指针的移动来实现内部任务优先级的晋升。经过读指针来实现任务严格按照优先级顺序被处理,且避免低优先级任务被高优先级任务饿死。数组的元素指向一个该槽位上插入优先级最低的分段。一同散列到同一个槽位上的分段按照插入优先级的顺序造成队列。
整个代码当中,最为复杂的就是任务的插入和读取,下面分别来设计流程。
上面推导过程分析了插入和读取并发可能致使的冲突场景。这里咱们细化其解决流程。对于插入线程而言,要处理的状况包括有:
能够看到,只有第四种状况任务能够在当前分段插入成功,且插入完毕后还须要再次检查分段的状态。基于这些考量,咱们将插入流程设计为
能够看到,这个流程中没有处理槽位上没有分段的状况,这个在下一个章节咱们会分析。
有了分段的存在,读取指针的移动断定更加复杂,读取线程可能碰到的场景有:
只有第三种状况能够读取任务而且进行处理。有了轮次这个概念,读取指针永远只会读取槽位上的第一个分段。若是槽位上没有分段,或者分段的插入优先级与读取指针不一样,或者分段内没有任务,则能够考虑移动读取指针。注意,分段状态为关闭并非读取指针移动的条件,缘由下面会分析。
可是移动读取指针的时候首先须要考虑当前读取指针是否已经处于(写入指针的值+最低优先级数字),若是是的话,意味着已经处于边界,不该该在移动。
分段状态的更新只能由读取线程来进行。当读取线程发现该分段已经没有任务了,首先应该经过CAS的方式更新分段状态。CAS竞争成功的线程再次检查分段内是否出现了新的任务,若是出现的话,则提取任务,完成任务读取。为什么不将任务移动到下一个槽位。由于下一个槽位上可能尚未分段,此时读取线程可能和写入线程竞争槽位上的分段写入。若是写入线程竞争成功,读取线程移动过去的任务数据的优先级就放到了错误的分段中;若是读取线程竞争成功,则读取线程建立的分段必须是第一个分段,不然任务仍是移动到错误的地方。
解决这个问题最好的办法就是不解决。不移动任务,仍然在该分段上读取任务直到任务耗尽。而后再尝试移动读取指针。而对于写入线程而言,当其发现分段的状态变为终止后,是提取出任务从新执行完整的放入流程,不会有并发的问题。
再次梳理下没有任务状况下的流程,应该是经过CAS修改分段的状态。不管成功或失败,均可以继续检查队列是否有任务,若是有的话,则返回读取到的任务。若是没有的话,则CAS将读取指针+1。竞争成功的线程将当前分段的下一个分段设置给槽位,而且从新执行读取流程。竞争失败的线程则反复检查读取指针的值,发现变化后,从新执行读取流程。
这里有一个并发冲突须要考虑,当读取线程尝试将当前分段的下一个分段设置为槽位的值时,可能此时当前分段的下一个分段是null,而写入线程正在尝试为当前分段设置下一个分段。这种状况下可能致使下一个分段丢失。特别的,若是当前分段的下一个分段已经被设置,而且有任务被放入其中,丢失这个分段就意味着数据丢失。
为了不这个状况,在当前分段的下一个分段为null时,就不能将下一个分段(属性值)设置给槽位。这使得在读取到分段时,须要首先检查分段的优先级,确认是否本轮次。若是是的话,再执行后续的流程。不然要么移动(该分段没有下一个分段),要么将该分段的下一个分段设置给槽位后,在移动。
从这个角度出发,咱们能够在初始化的时候,将数组中的元素都填充一个分段。这样写入线程就不须要处理槽位上可能为空的场景了。
基于此,咱们将读取任务的变化为:
第一种状况,若是该分段有下一个分段,CAS更新到槽位上;若是没有,则CAS移动读取指针。
第二种状况,按照上面分析的流程进行处理便可。
第三种状况,CAS移动读取指针。
综上,咱们能够将读取流程设计为
在JDK提供的ThreadPoolExecutor
类的构造方法中,须要传入BlockingQueue
做为队列的接口。显然,上述的存储结构并不能支持BlockQueue
,须要考虑包装。
显然,上面的存储结果在写入的时候并不会阻塞,所以只须要考虑如何包装读取数据不存在时的阻塞等待便可。
简单的方式就是在读取失败的获取锁,而且在队列空的condition
对象执行等待;插入任务的时候执行唤醒。
测试代码以下
首先添加必定量的高优先级任务,随后添加5个低优先级,最后经过CountLatch
模拟在运行过程当中添加高优先级任务。
若是单纯按照优先级排序,则须要全部高优先级任务输出完毕后才会输出低优先级任务,显然这是错误的。正确的实现应该是先输出第一批高优先级任务,再输出低优先级任务,最后输出第三批高优先级任务。运行代码,看到结果以下
与咱们的预期相吻合。
Gitee:gitee.com/eric_ds/eri…