在消息队列中,当消费者去消费消息的时候,不管是经过 pull 的方式仍是 push 的方式,均可能会出现大批量的消息突刺。若是此时要处理全部消息,极可能会致使系统负载太高,影响稳定性。但其实可能后面几秒以内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。咱们但愿能够把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽量地处理更多消息,从而起到“削峰填谷”的效果:html
上图中红色的部分表明超出消息处理能力的部分。架构
咱们能够看到消息突刺每每都是瞬时的、不规律的,其后一段时间系统每每都会有空闲资源。咱们但愿把红色的那部分消息平摊到后面空闲时去处理,这样既能够保证系统负载处在一个稳定的水位,又能够尽量地处理更多消息,这时候咱们就须要一个可以控制消费端消息匀速处理的利器 — AHAS 流控降级,来为消息队列削峰填谷,保驾护航。异步
AHAS 的流控降级是面向分布式服务架构的专业流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性,同时提供强大的聚合监控和历史监控查询功能。分布式
AHAS 专门为这种场景提供了匀速排队的控制特性,能够把忽然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求经过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺形成系统负载太高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝所有请求。阿里云
好比在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间,预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图以下图所示:url
本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。spa
首先您须要到AHAS 控制台开通 AHAS 功能(免费)。能够根据 开通 AHAS 文档 里面的指引进行开通。线程
在结合阿里云 RocketMQ Client 使用 Sentinel 时,用户须要引入 AHAS Sentinel 的依赖 ahas-sentinel-client
(以 Maven 为例):3d
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>1.1.0</version> </dependency>
因为 RocketMQ Client 未提供相应拦截机制,并且每次收到均可能是批量的消息,所以用户在处理消息时须要手动进行资源定义(埋点)。咱们能够在处理消息的逻辑处手动进行埋点,资源名能够根据须要来肯定(如 groupId + topic 的组合):code
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 资源名称为 groupId 和 topic 的组合,便于标识,同时能够针对不一样的 groupId 和 topic 配置不一样的规则 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 在此处编写真实的处理逻辑 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 在编写处理被流控的逻辑 // 示例:能够在此处记录错误或进行重试 System.err.println("Blocked, will retry later: " + message); return Action.ReconsumeLater; // 会触发消息从新投递 } finally { if (entry != null) { entry.exit(); } } }
消费者订阅消息的逻辑示例:
Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message); }); consumer.start();
更多关于 RocketMQ SDK 的信息能够参考 消息队列 RocketMQ 入门文档。
注意:若在本地运行接入 AHAS Sentinel 控制台须要在页面左上角选择 公网 环境,若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。
咱们能够进入 AHAS 控制台,点击左侧侧边栏的 流控降级,进入 AHAS 流控降级控制台应用总览页面。在页面右上角,单击添加应用,选择 SDK 接入页签,到 配置启动参数 页签拿到须要的启动参数(详情请参考 SDK 接入文档),相似于:
-Dproject.name=AppName -Dahas.license=<License>
其中 project.name
配置项表明应用名(会显示在控制台,好比 MqConsumerDemo
),ahas.license
配置项表明本身的受权 license(ECS 环境不须要此项)。
接下来咱们添加获取到的启动参数,启动修改好的 Consumer 应用。因为 AHAS 流控降级须要进行资源调用才能触发初始化,所以首先须要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后,咱们就能够在 AHAS Sentinel 控制台上看到咱们的应用了。点击应用卡片,进入详情页面后点击左侧侧边栏的“机器列表”。咱们能够在机器列表页面看到刚刚接入的机器,表明接入成功:
点击“请求链路”页面,咱们能够看到以前定义的资源。点击右边的“流控”按钮添加新的流控规则:
咱们在“流控方式”中选择“排队等待”,设置 QPS 为 10,表明每 100ms 匀速经过一个请求;而且设置最大超时时长为 2000ms,超出此超时时间的请求将不会排队,当即拒绝。配置完成后点击新建按钮。
下面咱们能够在 Producer 端批量发送消息,而后在 Consumer 端的控制台输出处观察效果。能够看到消息消费的速率是匀速的,大约每 100 ms 消费一条消息:
1550732955137 | handling message: Hello MQ 2453 1550732955236 | handling message: Hello MQ 9162 1550732955338 | handling message: Hello MQ 4944 1550732955438 | handling message: Hello MQ 5582 1550732955538 | handling message: Hello MQ 4493 1550732955637 | handling message: Hello MQ 3036 1550732955738 | handling message: Hello MQ 1381 1550732955834 | handling message: Hello MQ 1450 1550732955937 | handling message: Hello MQ 5871
同时不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候,须要根据需求决定是否须要从新消费消息。
咱们也能够点击左侧侧边栏的“监控详情”进入监控详情页面,查看处理消息的监控曲线:
对比普通限流模式的监控曲线(最右面的部分):
若是不开启匀速模式,只是普通的限流模式,则只会同时处理 10 条消息,其他的所有被拒绝,即便后面的时间系统资源充足多余的请求也没法被处理,于是浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力获得了更好的利用。
Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 相似,这里给出一个简单的代码示例:
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) { pool.submit(() -> { Entry entry = null; try { // 资源名称为 groupId 和 topic 的组合,便于标识,同时能够针对不一样的 groupId 和 topic 配置不一样的规则 entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic); // 在此处理消息. System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString()); } catch (BlockException ex) { // Blocked. // NOTE: 在处理请求被拒绝的时候,须要根据需求决定是否须要从新消费消息 System.err.println("Blocked: " + record.toString()); } finally { if (entry != null) { entry.exit(); } } }); }
消费消息的逻辑:
while (true) { try { ConsumerRecords<String, String> records = consumer.poll(1000); // 必须在下次 poll 以前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG // 建议开一个单独的线程池来消费消息,而后异步返回结果 for (ConsumerRecord<String, String> record : records) { handleMessage(record, groupId, topic); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } }
以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速,它还能够处理更复杂的各类状况,好比:
本文为云栖社区原创内容,未经容许不得转载。