pom依赖:算法
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
配置:spring
spring.rabbitmq.host=192.168.99.100 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
而后就能够直接使用stream的封装的方式进行消息收发,代码层面对rabbitmq无感知。微服务
默认状况下,sender端绑定的exchange会在rabbitmq上建立一个持久化的exchange,类型是topic,routing key是#.reciver端会自动建立一个临时的queue(队列名是自动生成的),绑定到exhange上面。 这样的话若是有多个消费端节点,那么一条消息就会被多个消费者同时消费。 所以消息分组无非就是消费端指定固定的queue名称。这样多个消费者都会绑定到相同的queque上,那么一条消息也就只会被一个消费者消费。
消费端配置:3d
# 指定交换机名称 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 指定队列名称 spring.cloud.stream.bindings.inputProduct.group=groupProduct
public interface IReceiveService { String INPUT="inputProduct"; @Input(INPUT) SubscribableChannel receive(); }
这样,Rabbitmq上会建立一个topic类型,routing key为#,持久化的echange,名称为exchangeProduct。 会建立一个持久化的queue,名称为exchangeProduct.groupProduct。code
在rabbitmq中没有这个概念。 在微服务中,是为了保证相同的消息被同一个节点接收的问题。 相同消息指的应该是序列化后的内容相同。blog
问题:将相同消息路由到同一节点的使用场景是什么?索引
生产端配置:rabbitmq
# exchange名称 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct #经过该参数指定了分区键的表达式规则 spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload #指定了消息分区的数量(也就是消费者数量) spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
消费端配置:队列
# exchange名称 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 队列名称 spring.cloud.stream.bindings.inputProduct.group=groupProduct #开启消费者分区功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了当前消费者的总实例数量 spring.cloud.stream.instanceCount=2 #设置当前实例的索引号,从0开始 spring.cloud.stream.instanceIndex=0
这样配置后,在rabbitmq上,会建立两个持久化队列exchangeProduct.groupProduct-0和exchangeProduct.groupProduct-1. 和交换机的routing key分别为exchangeProduct-0,exchangeProduct-1.ci
可见,消息分区的功能彻底是由stream端实现的。实现大体应该是在生产者端,stream根据序列化后的内容,根据特定的hash算法,将消息路由到特定的routing key,进而发送到对应的queue上面去,从而实现了相同消息会被发送到相同的节点上。