在上篇文章中咱们给你们介绍了Stream的消息分组,能够实现消息的重复消费的问题,但在某些场景下分组还不能知足咱们的需求,好比,同时有多条同一个用户的数据,发送过来,咱们须要根据用户统计,可是消息被分散到了不一样的集群节点上了,这时咱们就能够考虑消息分区了。
当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理。java
将咱们上篇文章中的分组的三个项目,拷贝一份修更名称及服务名称git
发送多条消息查看效果github
@RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){ Product p = new Product(999, "stream test ...999"); // 将须要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload(p) .build(); for (int i = 0; i < 10; i++) { // 发送多条消息到队列中 sendService.send().send(message ); } } }
10条消息被随机的分散到了两个消费者中:spring
咱们能够看到A中6条消息,B中4条消息,并且这是随机的,下次执行的结果确定不同。app
spring.application.name=stream-partition-sender server.port=9060 #设置服务注册中心地址,指向另外一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 连接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange outputProduct自定义的信息 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct #经过该参数指定了分区键的表达式规则 spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload #指定了消息分区的数量。 spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
服务A测试
spring.application.name=stream-partition-receiverA server.port=9070 #设置服务注册中心地址,指向另外一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 连接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange 和消息发送者的 交换器是同一个 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具体分组 对应 MQ 是 队列名称 而且持久化队列 inputProduct 自定义 spring.cloud.stream.bindings.inputProduct.group=groupProduct999 #开启消费者分区功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了当前消费者的总实例数量 spring.cloud.stream.instanceCount=2 #设置当前实例的索引号,从 0 开始 spring.cloud.stream.instanceIndex=0
服务Bui
spring.application.name=stream-partition-receiverB server.port=9071 #设置服务注册中心地址,指向另外一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 连接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange 和消息发送者的 交换器是同一个 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具体分组 对应 MQ 是 队列名称 而且持久化队列 inputProduct 自定义 spring.cloud.stream.bindings.inputProduct.group=groupProduct999 #开启消费者分区功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了当前消费者的总实例数量 spring.cloud.stream.instanceCount=2 #设置当前实例的索引号,从 1 开始 spring.cloud.stream.instanceIndex=1
启动服务测试3d
10个消息都被消费者A给消费了,说明到达了咱们须要的效果。
案例源码:https://github.com/q279583842q/springcloud-e-bookcode