随着公司的快速发展,天天推送的消息量不断增长。以前的旧消息系统已经日益不能知足现阶段推送场景的功能要求,咱们就开始从0到1构建一个完整的消息系统。java
原始架构中存在各类各样的痛点与挑战:redis
业务接入比较慢,发送不一样类型的消息须要接入不一样的api. 消息消费处理都很慢,影响活动运营体验。mongodb
业务调用量统计不清楚,没法针对不一样业务进行关闭/限流。api
缺少优先级消息,在营销类消息大批量推送时,正常的订单相关的消息就会堆积,阻塞会被延时。安全
经过下列4个方面进行优化:架构
切换数据源存储,选择写比es相对高一点和读相对es少一点的mongodb优化
消息平台目前总体架构*spa
目前消息平台的优先级采用2种发送实现的,首先使用传统的消息队列kafka进行一次优先级发送和消费,后面采用优先线程池任务进行优先消息发送。线程
kafka自己并不支持优先级,咱们经过下列2个方案进行处理,人为的对kafka不一样队列发送不一样优先级消息。接口
采用建立不一样的topic,不一样优先级消息发送到不一样的topic中,同时在消息消费的时候,按照不一样的比例获取不一样topic的数据进行消费。
目前的顺序是优先级最高的是第二高的2倍,依次进行下去。最后剩余的拉取消息值加入到优先级最高的里面 好比一次拉取50条,3个topic 那么咱们就是按照 (25 +5/ 13 / 7) 进行拉取。
高优先级消息没有了如何让低优先级的消息满负载拉取,即按照上述优先级最低的消息一次性拉取50条消息呢?
引入消息拉取状态机,优先级消息比较低的时候,加大低优先级的消费。目前消息服务状态机,有初始化、低负载、高负载等几个状态,经过判断上一次处理的消息条数来肯定消息消费者当前的状态并进行拉取参数的修改,目前采用反射的方式修改kafka的拉取数量。
为了加快速度发送咱们也采用了本地线程池,本地线程任务,咱们采用任务优先级队列。下面是提交一个线程任务的流程。
在上图中,咱们经过给一个线程任务一个自增的序列号以及以前定义的优先级值进行比较,惟一肯定一条任务的执行优先级。
首先咱们定义延时/定时策略有一下几个策略:
咱们将延时定时的消息区分上述3种类型以后,分别有不一样的实现方式。在低于15s的时候咱们直接采用了java自带的delayTask进行消息判断&推送。而高于15s低于30分的,咱们本身建立了一个秒基本级别的单时间轮,进行消息推送,下述是时间轮的执行。
可是咱们在这个基础上进行了部分优化,参考了kafka的延时队列,当时间轮中没有须要执行的任务以后,咱们直接对执行的线程进行wait等待,直到下个任务提交notify唤醒。对于高于30分钟的延时任务,咱们通常先进行消息任务的存储,在任务快要执行的30分钟以前,咱们将任务数据加入到秒级别的时间轮中,参考第二种进行消息发送。(为啥不用天/小时级别时间轮,纯粹是不想浪费内存)
咱们在消息推送过程当中,用户的防疲劳是必要的,目前消息中心的防疲劳场景主要有如下几种类型:
咱们在上述几种场景,主要采用的是mongo进行数据的存储和聚合查询,由于若是使用redis场景在多用户的时候,会频繁的操做redis,并非很好。且大部分防疲劳的数据咱们也仅仅最多保留1周,mongo的集合,很容易把咱们这些功能知足。固然在某个具体的业务1天内只能收到1条消息这个场景中,咱们采用了redis的helperLogLog进行防疲劳,减小查询和内存消耗。虽然有一点的偏差,可是咱们的使用场景上影响不是很高。
最后咱们从本身的实战总结下来构建一个消息平台须要思考的点:
关注得物技术,携手走向技术的云端