拍拍贷消息系统原理与应用

前言

在5月12日的Java开发者大会上,除了我本人进行分享以外,还有其余5位优秀的老师也有精彩的分享。sql

今天我将根据本身的记忆来给你们分享下 拍拍贷 基础框架研发资深专家 李乘胜老师的演讲内容。数据库

李乘胜老师演讲的主题是:拍拍贷消息系统原理与应用,也就是拍拍贷内部使用的消息系统,自研发的,没有用市场上开源的。bash

咱们都知道,对于大厂来讲,是有绝对的自研发的技术实力。自研发每每能更贴近公司实际的业务使用场景,因此不少大厂都选择了自研。微信

有一个好消息就是后面这套消息系统后面会开源,咱们又多了一个很是优秀的消息系统。可是在目前为止尚未对应的文章来介绍这套消息系统,因此今天的文章你们要认真阅读哦!多线程

内容有点偏多,不会所有写出来,感兴趣的朋友能够加我微信 jihuan900 我发原始的PPT给你参考学习。架构

介绍

拍拍贷消息系统是拍拍贷中间件团队,在普遍调研业界开源消息系统的基础上,结合公司现状和自身实践,研发的一款轻量级消息系统。具备丰富的功能和完善的治理框架

对于框架的选型咱们最在乎的就是它的性能怎么样?有没有被大规模使用?运维

  • 支撑拍拍贷天天高峰27+亿消息量、1000多G的消息数据量。
  • 平常高峰2万Tps写入。

消息系统你最在乎的问题有哪些

  1. 消息发送慢怎么办? 有监控功能,发现问题后能够扩容Broker或者扩容队列
  2. 消息堆积了,如何快速知晓与处理? 这个能够根据客户端的监控查看消费耗时和机器负载来觉定是否加队列,加线程数,或者加消费实例
  3. 失败消息如何处理? 独立的失败队列,重试
  4. 如何保障消息的高可靠? 数据库保证

上面4点是李老师PPT上介绍的,我在这边补充几点:ide

  • 如何保证消息的有序性? 目前消息存储在Mysql中,id是自增加,消费的时候从小到大消费,只能保证单队列有序性
  • 如何实现延迟消息? 能够在后台配置延迟多久消费,文章后面会进行讲解
  • 可否支持消息回溯? 后台修改偏移量
  • 部署维护是否方便? 这个部署运维简单,分库分表不用本身考虑,portal有部署脚本

上面是咱们在选用消息系统的时候一般都会去考虑的问题,目前市面上最多见的有Kafka、RabbitMQ、RocketMQ 等,那么拍拍贷的消息系统跟这些开源的区别在哪呢?下面咱们一块儿来了解下这款消息系统的架构设计。微服务

架构设计

架构设计

拍拍贷消息系统的设计仍是简洁易懂的,消息存储直接用了数据库来实现,不用考虑存储这块的复杂性。

发布消息使用Http协议,支持全部语言,消息订阅也是Http协议,目前采起的是主动拉取消息的方式,批量拉取。

Broker 是无状态的,能够搭建集群,水平扩展,无单点问题

在个人书《Spring Cloud微服务-全栈技术与案例解析》中也有对消息可靠性的介绍,大概的思路跟今天介绍的消息系统差很少,都是消息先落地到数据库中。

个人是将数据库中的消息投递到消息队列,经过消息队列来消费,消费完以后手动确认消费加剧试来保证可靠性,固然在里面也能够作不少治理的功能。我这样作主要是对于生产方来讲屏蔽了队列,封装成了消息服务,可是对于消费方来讲仍是要关注队列的存在。

拍拍贷这套就彻底抛弃了第三方的消息队列,消费消息也是本身开发的,拉取模式。

消息复用与偏移

每一个消费方会记录本身的偏移量,后台还能够动态修改偏移量来达到消息回溯的功能。

功能点

动态启停消费

动态调整偏移

支持延时消息,支持多线程消费,动态调整

消息消费状况查看

指定失败消息从新消费

监控治理

在监控这块,咱们最关心的有如下几点:

  • 消费失败告警
  • 消息堆积告警
  • Topic消息报表, 天天的消费状况,性能等
  • 发送效率慢,怎么排查?

对于一些告警,该消息系统都支持了,并且有很是多丰富的报表功能,监控对接了Cat,排查问题很是方便。

客户端发送

客户端发送消息很是简单,有现成封装好了的SDK

MqClient.publish("Test1", "", new ProducerDataDto(“111111")); 复制代码

客户端消费

配置订阅

<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
   <consumer groupName="Test1Sub">
      <topics>
         <topic name="Test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
     </topics>
   </consumer>
</messageQueue>
复制代码

代码订阅

ConsumerGroupVo consumerGroup = new ConsumerGroupVo("Test1Sub");
ConsumerGroupTopicVo topicVo = new ConsumerGroupTopicVo();
topicVo.setName("Test");
topicVo.setSubscriber(new ISubscriber() {
    @Override
    public List<Long> onMessageReceived(List<MessageDto> messages) {
        return null;
    }
});
consumerGroup.addTopic(topicVo);
MqClient.registerConsumerGroup(consumerGroup);
复制代码

猿天地
相关文章
相关标签/搜索