Spring Cloud Stream如何处理消息重复消费?

最近收到好几个相似的问题:使用Spring Cloud Stream操做RabbitMQ或Kafka的时候,出现消息重复消费的问题。经过沟通与排查下来主要仍是用户对消费组的认识不够。其实,在以前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及做用。java

那么什么是消费组呢?为何要用消费组?它解决什么问题呢?摘录一段以前博文的内容,来解答这些疑问:git

一般在生产环境,咱们的每一个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认状况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每一个消费者实例接收和处理(出现上述重复消费问题)。可是有些业务场景之下,咱们但愿生产者产生的消息只被其中一个实例消费,这个时候咱们须要为这些消费者设置消费组来实现这样的功能。github

详细也可查看原文:消息驱动的微服务(消费组)spring

下面,经过一个例子来看看如何使用消费组:bash

问题重现

构建消息消费端

第一步:建立绑定接口,绑定example-topic输入通道(默认状况下,会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。app

interface ExampleBinder {

    String NAME = "example-topic";

    @Input(NAME)
    SubscribableChannel input();

}
复制代码

第二步:对上述输入通道建立监听与处理逻辑。负载均衡

@EnableBinding(ExampleBinder.class)
public class ExampleReceiver {

    private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);

    @StreamListener(ExampleBinder.NAME)
    public void receive(String payload) {
        logger.info("Received: " + payload);
    }

}
复制代码

第三步;建立应用主类和配置文件微服务

@SpringBootApplication
public class ExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}
复制代码
spring.application.name=stream-consumer-group
server.port=0
复制代码

这里设置server.port=0,以方便在本地启动多实例来重现问题。测试

完成上述操做以后,启动两个该应用的实例,以备后续调用。ui

构建消息生产端

比较简单,须要注意的是,使用@Output建立一个同名的输出绑定,这样发出的消息才能被上述启动的实例接收到。具体实现以下:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})
public class ExampleApplicationTests {

	@Autowired
	private ExampleBinder exampleBinder;

	@Test
	public void exampleBinderTester() {
        exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
	}

	public interface ExampleBinder {

		String NAME = "example-topic";

		@Output(NAME)
		MessageChannel output();

	}

}

复制代码

启动上述测试用例以后,能够发现以前启动的两个实例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重复消费的问题成功重现!

使用消费组解决问题

如何解决上述消息重复消费的问题呢?咱们只须要在配置文件中增长以下配置便可:

spring.cloud.stream.bindings.example-topic.group=aaa
复制代码

当咱们指定了某个绑定所指向的消费组以后,往当前主题发送的消息在每一个订阅消费组中,只会有一个订阅者接收和消费,从而实现了对消息的负载均衡。只因此以前会出现重复消费的问题,是因为默认状况下,任何订阅都会产生一个匿名消费组,因此每一个订阅实例都会有本身的消费组,从而当有消息发送的时候,就造成了广播的模式。

另外,须要注意上述配置中example-topic是在代码中@Output@Input中传入的名字。

代码示例

本文示例读者能够经过查看下面仓库的中的stream-consumer-group项目:

若是您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

如下专题教程也许您会有兴趣

相关文章
相关标签/搜索