最近项目里面遇到了一个较为复杂的并发系统设计,虽然最终仍是实现了并投入运行,可是耗时仍是挺久的,想一想能够总结下,但愿能够帮助之后的设计和实践,须要注意的是,我专一于介绍遇到的问题以及相应的解决思路,可能不够系统性,欢迎指正。
谈到并发,不少童鞋听到的都是优势,我想泼下冷水,虽然多线程能够极大的提升效率,吞吐量等,可是并发不是银弹,引入的同时会提升系统的复杂性,给设计和调试带了难度(还不包括引入的系统开销例如线程切换等),因此须要辩证的来看待并发,这点不少基础并发书籍都会在最开始介绍。
背景介绍
咱们所作的是一个监控系统,用于监控咱们开发的线上服务。咱们的线上服务大体包括两部分,
首先定义下咱们的监控目标:延迟。也就是说,A的流入数据到B的产出数据时间在95%的percentile超过5分钟即表示有较大延迟须要报警。
初步设计
根据目标,咱们设计的初步架构是在内存中保存全部的数据结构,并根据咱们的阈值打各类perfCounter,再经过Counter内置的统计计算和报警系统进行报警。这样设计的目的是:简单并能够快速原型化,目前的需求不须要历史记录,因此落盘也就没有必要了。
首先第一个遇到的问题是,如何监控咱们高度依赖的服务A?
咱们但愿下降监控系统对被监控系统的耦合,因此很天然的仍是想到了引入一个中间件来解决,就是kafka queue。这样,咱们打算给A和B定义以下几组kafka消息,咱们须要监控目标就变成了最初的消息和最后的消息的时间差,结构大体以下:
- 时间戳;
- 步骤名称;
- 各个步骤的额外数据;
- id;
- 是否成功;
- 错误消息;
整体思路能够描绘以下:
总共有两个线程,一个是主线程,负责从kafka中取消息消费,并根据消息类型路由到各个handler处理;另外一个线程是超时记录线程,用于不断的从内存队列中取消息并计时,若是超时且没有没有收到消息E(结束)即认为超时并打上报警的counter。
因此,一次处理超时的定义为 time(E) - time(A) > threshold
这里须要考虑的问题是怎么设计数据结构来存储消息以及如何加锁来控制并发呢?
因为A,B,C,D,E的消息获取记录都须要,由于系统的此次处理可能中断在任何一个消息的处理上,因此这些收到的消息都须要给到超时记录线程用于报警。因此,很天然的想到了ConcurrentMap,key是消息id,value则是另外一个map,包括了各个消息步骤的数据,key是步骤名,value是消息。
经过wiki咱们知道,
A Map providing additional atomic putIfAbsent, remove, and replace methods.Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentMap as a key or value happen-before actions subsequent to the access or removal of that object from the ConcurrentMap in another thread.
这个并发map,保证了删除和放入的原子性,以及可见性,这样这个数据结构就至关于自带锁了,它会同时被消费线程和超时记录线程访问。
(这里有个小技巧须要注意,操做value的时候每每须要判断是否存在某个key,由于这个key随时可能被其余线程删除,因此通常来讲是把value的引用记下来而不是判断一次后,每次都去读,由于有被异步删除的风险。)
乍一看,逻辑彷佛很清晰,各个消息的handler在处理消息的时候,判断有没有失败,失败就直接报警(看成处理不成功),成功的话直接放入超时线程计时。对于E的handler,多加上判断是否结束,若是结束,把当前的时间差记录下来并直接返回(删掉map中的key)。
版本迭代
2.0版本:
细心的同窗应该发现,这里还有并发问题:
- 对于一个消息,在整个流程中,其实应该只被打一次的(成功或者失败),可是咱们以前方案并无考虑,可能一个消息在E处刚打了成功,与此同时,超时记录线程正好记录到超时(没来得及检查map的key是否被删除),这个时候消息就被打了两次时间,形成数据不真实;
这个并发问题不容易发现是由于,共享变量不是那么好察觉,不像那个map,这个打counter其实也是共享变量,因此审视一个并发系统须要从总体和局部同时考虑,局部上咱们发现须要对map加锁,总体上,监控系统的输入是各类kafka消息,输出是counter计时,这样考虑咱们可能就比较容易发现这个隐藏的共享变量了。
那么怎么解决呢?对每一个消息的counter加锁便可。因为不想在原来的map加数据了,因此新加了一个concurrentMap来记录每个消息id->AtomicBoolean,这样,在每次打成功或者失败的counter的时候,用CAS判断便可,自己AtomicBoolean是保证可见性的,这点就不须要担忧了。
但是,不久又有新需求来了,你们须要知道每个消息A,B,C,D,E的处理时间差,这样方便定位是哪一步比较慢。
3.0版本:
针对这个问题,我看到是同一个id的每一步都须要打一次的锁,因此,又建了一个每个id+step的ConcurrentMap来记录每个id+step -> AtomicBoolean来保证每个step都只被打一次。
改完以后,上线发现C,D和E的时间比E的时间还长,思前想后,忽然发现没有考虑到A,B,C,D, E的顺序是不保证了,因此,不能假设C比D早来,或者E比D晚来,这样就形成了,某些id的处理E先来了,打了一个结束时间的counter,C和D还没来,等它们来了,因为已经结束了就不会计时了,这样形成整体C,D的时间少了。解决方法是,在E来了,结束时,检查C,D是否来,没来按当前时间打便可。
总结
直到这个版本上线,才使得整体的时间指标看起来比较正常,这3个版本走走停停改了将近1个月感慨良多,深入的理解了多线程带来的系统复杂性和调试复杂性,以为有下来如下几点值得总结:
- 在设计的时候,尽可能多花时间考虑清楚可能形成并发问题的点;
- 能够从总体和局部,输入和输出等多方面来考虑并发点,再设计相应的并发工具解决;
- 在引入多线程的时候,要考虑到即将形成的复杂性提高,须要有权衡的思考;