本篇文章为系列文章,未读第一集的同窗请猛戳这里:Spring Cloud 系列之 Stream 消息驱动(一)java
本篇文章讲解 Stream 如何实现消息分组和消息分区。spring
点击连接观看:Stream 消息分组视频(获取更多请关注公众号「哈喽沃德先生」)shell
若是有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到,这种状况在某些实际场景下是有很大问题的,好比在以下场景中,订单系统作集群部署,都会从 RabbitMQ 中获取订单信息,若是一个订单消息同时被两个服务消费,系统确定会出现问题。为了不这种状况,Stream 提供了消息分组来解决该问题。express
在 Stream 中处于同一个 group
中的多个消费者是竞争关系,可以保证消息只会被其中一个应用消费。不一样的组是能够消费的,同一个组会发生竞争关系,只有其中一个能够消费。经过 spring.cloud.stream.bindings.<bindingName>.group
属性指定组名。segmentfault
在 stream-demo
项目下建立 stream-consumer02
子项目。 api
项目代码使用入门案例中消息消费者的代码。服务器
单元测试代码以下:app
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }
运行单元测试发送消息,两个消息消费者控制台打印结果以下:ide
stream-consumer 的控制台:单元测试
message = hello spring cloud stream
stream-consumer02 的控制台:
message = hello spring cloud stream
经过结果能够看到消息被两个消费者同时消费了,缘由是由于它们属于不一样的分组,默认状况下分组名称是随机生成的,经过 RabbitMQ 也能够得知:
stream-consumer 的分组配置为:group-A
。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A
stream-consumer02 的分组配置为:group-A
。
server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A
运行单元测试发送消息,此时多个消息消费者只有其中一个能够消费。RabbitMQ 结果以下:
点击连接观看:Stream 消息分区视频(获取更多请关注公众号「哈喽沃德先生」)
经过消息分组能够解决消息被重复消费的问题,但在某些场景下分组还不能知足咱们的需求。好比,同时有多条同一个用户的数据发送过来,咱们须要根据用户统计,可是消息被分散到了不一样的集群节点上了,这时咱们就能够考虑使用消息分区了。
当生产者将消息发送给多个消费者时,保证同一消息始终由同一个消费者实例接收和处理。消息分区是对消息分组的一种补充。
先给你们演示一下消息未分区的效果,单元测试代码以下:
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { for (int i = 1; i <= 10; i++) { messageProducer.send("hello spring cloud stream"); } } }
运行单元测试发送消息,两个消息消费者控制台打印结果以下:
stream-consumer 的控制台:
message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream
stream-consumer02 的控制台:
message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream
假设这 10 条消息都来自同一个用户,正确的方式应该都由一个消费者消费全部消息,不然系统确定会出现问题。为了不这种状况,Stream 提供了消息分区来解决该问题。
消息生产者配置分区键的表达式规则和消息分区的数量。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 producer: partition-key-expression: payload # 配置分区键的表达式规则 partition-count: 2 # 配置消息分区的数量
经过 partition-key-expression
参数指定分区键的表达式规则,用于区分每一个消息被发送至对应分区的输出 channel
。
该表达式做用于传递给 MessageChannel
的 send
方法的参数,该参数实现 org.springframework.messaging.Message
接口的 GenericMessage
类。
源码 MessageChannel.java
package org.springframework.messaging; @FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1L; default boolean send(Message<?> message) { return this.send(message, -1L); } boolean send(Message<?> var1, long var2); }
源码 GenericMessage.java
package org.springframework.messaging.support; import java.io.Serializable; import java.util.Map; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; public class GenericMessage<T> implements Message<T>, Serializable { private static final long serialVersionUID = 4268801052358035098L; private final T payload; private final MessageHeaders headers; ... }
若是 partition-key-expression
的值是 payload
,将会使用全部放在 GenericMessage
中的数据做为分区数据。payload
是消息的实体类型,能够为自定义类型好比 User
,Role
等等。
若是 partition-key-expression
的值是 headers["xxx"]
,将由 MessageBuilder
类的 setHeader()
方法完成赋值,好比:
package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 发送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build()); } }
消息消费者配置消费者总数和当前消费者的索引并开启分区支持。
stream-consumer 的 application.yml
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 0 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持
stream-consumer02 的 application.yml
server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 1 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持
运行单元测试发送消息,此时多个消息消费者只有其中一个能够消费全部消息。RabbitMQ 结果以下:
至此 Stream 消息驱动全部的知识点就讲解结束了。
本文采用 知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议
。
你们能够经过 分类
查看更多关于 Spring Cloud
的文章。
🤗 您的点赞
和转发
是对我最大的支持。
📢 扫码关注 哈喽沃德先生
「文档 + 视频」每篇文章都配有专门视频讲解,学习更轻松噢 ~