Spring Cloud架构教程 (八)消息驱动的微服务(消费组)【Dalston版】

使用消费组实现消息消费的负载均衡

一般在生产环境,咱们的每一个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。html

默认状况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每一个消费者实例接收和处理,可是有些业务场景之下,咱们但愿生产者产生的消息只被其中一个实例消费,这个时候咱们须要为这些消费者设置消费组来实现这样的功能,实现的方式很是简单,咱们只须要在服务消费者端设置spring.cloud.stream.bindings.input.group属性便可,好比咱们能够这样实现:spring

  • 先建立一个消费者应用SinkReceiver,实现了greetings主题上的输入通道绑定,它的实现以下:
    @EnableBinding(value = {Sink.class})
    public class SinkReceiver {
    
        private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
    
        @StreamListener(Sink.INPUT)
        public void receive(User user) {
            logger.info("Received: " + user);
        }
    }

     

  • 为了将SinkReceiver的输入通道目标设置为greetings主题,以及将该服务的实例设置为同一个消费组,作以下设置:
    spring.cloud.stream.bindings.input.group=Service-A
    spring.cloud.stream.bindings.input.destination=greetings

    经过spring.cloud.stream.bindings.input.group属性指定了该应用实例都属于Service-A消费组,而spring.cloud.stream.bindings.input.destination属性则指定了输入通道对应的主题名。负载均衡

  • 完成了消息消费者以后,咱们再来实现一个消息生产者应用SinkSender,具体以下:
    @EnableBinding(value = {Source.class})
    public class SinkSender {
    
        private static Logger logger = LoggerFactory.getLogger(SinkSender.class);
    
        @Bean
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
        public MessageSource<String> timerMessageSource() {
            return () -> new GenericMessage<>("{\"name\":\"didi\", \"age\":30}");
        }
    
    }

     

  • 为消息生产者SinkSender作一些设置,让它的输出通道绑定目标也指向greetings主题,具体以下:
    spring.cloud.stream.bindings.output.destination=greetings

    到这里,对于消费分组的示例就已经完成了。spa

    分别运行上面实现的生产者与消费者,其中消费者咱们启动多个实例。经过控制台,咱们能够发现每一个生产者发出的消息,会被启动的消费者以轮询的方式进行接收和输出。源码来源code

相关文章
相关标签/搜索