前言: 本文做者张天,节选自笔者与其合著的《Spring Cloud微服务架构进阶》,即将在八月出版问世。本文将其中Spring Cloud Stream应用与自定义Rocketmq Binder的内容抽取出来,主要介绍实现Spring Cloud Stream 的RocketMQ绑定器。java
在上一篇中,介绍了Spring Cloud Stream基本的概念及其编程模型。除此以外,Spring Cloud Stream提供了Binder接口来用于和外部消息队列进行绑定。本文将讲述Binder SPI的基本概念,主要组件和实现细节。 Binder SPI经过一系列的接口,工具类和检测机制提供了与外部消息队列绑定的绑定器机制。SPI的关键点是Binder接口,这个接口负责提供和外部消息队列进行绑定的具体实现。git
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
复制代码
一个典型的自定义Binder组件实现应该包括如下几点:github
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
复制代码
Spring Cloud Stream基于Binder SPI的实现来进行channel和消息队列的绑定任务。不一样类型的消息队列中间件实现了不一样的绑定器Binder。好比说:Spring-Cloud-Stream-Binder-Kafka是针对Kafka的Binder实现,而Spring-Cloud-Stream-Binder-Rabbit则是针对RabbitMQ的Binder实现。spring
Spring Cloud Stream依赖于Spring Boot的自动配置机制来配置Binder。若是一个Binder实如今项目的classpath中被发现,Spring Cloud Stream将会自动使用它。好比说,一个Spring Cloud Stream项目须要绑定RabbitMQ中间件的Binder,在pom文件中加入下面的依赖来轻松实现。编程
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码
Spring Cloud Stream为接入不一样的消息队列提供了一整套的自定义机制,经过为每一个消息队里开发一个Binder来接入该消息队列。目前官方认定的Binder为rabbitmq binder和kafka binder。可是开发人员能够基于Stream Binder的机制来制定本身的Binder。下面咱们就构建一个简单的RocketMQ的Binder。bash
须要在resources/META-INF/spring.binders文件中配置有关RocketMQ的Configuration类,该配置类会使用@Import来导入为RocketMQ制定的RocketMessageChannelBinderConfiguration
。微信
rocket:\
org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration
复制代码
RocketMessageChannelBinderConfiguration
将会提供两个极其重要的bean实例,分别为RocketMessageChannelBinder
和RocketExchangeQueueProvisioner
。RocketMessageChannelBinder
主要是用于channel和消息队列的绑定,而RocketExchangeQueueProvisioner
则封装了RocketMQ的相关API,能够用于建立消息队列的基础组件,好比说队列,交换器等。架构
@Configuration
public class RocketMessageChannelBinderConfiguration {
@Autowired
private ConnectionFactory rocketConnectionFactory;
@Autowired
private RocketProperties rocketProperties;
@Bean
RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
this.rocketProperties, provisioningProvider());
return binder;
}
@Bean
RocketExchangeQueueProvisioner provisioningProvider() {
return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
}
}
复制代码
RocketMessageChannelBinder
继承了抽象类AbstractMessageChannelBinder
,并实现了#producerMessageHandler和#createConsumerEndpoint函数。并发
MessageHandler有向消息队列发送消息的能力,#createProducerMessageHandler函数就是为了建立MessageHandler对象,来将输出型Channel的消息发送到消息队列上。异步
protected MessageHandler createProducerMessageHandler( ProducerDestination destination, ExtendedProducerProperties<RocketProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
return endpoint;
}
复制代码
MessageProducer可以从消息队列接收消息,并将该消息发送输入型Channel。
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
return rocketInboundChannelAdapter;
}
复制代码
相似于RabbitMQ的Binder,须要实现下面一系列的类来实现从RocketMQ到对应MessageChannel的消息传递。
InnerConsumer实现的MessageListenerConcurrently接口是RocketMQ中用于并发接受异步消息的接口,该接口能够接收到RocketMQ发送过来的异步消息。而InnerConsumer在接受到消息以后,会将消息封装成RocketDelivery加入到阻塞队列中。
RocketBlockingQueueConsumer有一个阻塞队列来存储RocketMQ传递给RocketBlockingQueueConsumer.InnerConsumer的消息,而nextMessage函数能够从阻塞队列中拉取一个消息并返回。
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer是实现了Runnable接口,在run()接口中会无限循环地调用SimpleRocketMessageListenerContainer自己的receiveAndExecute。
@Override
public void run() {
if (!isActive()) {
return;
}
try {
//只要consumer的状态正常,就会一直循环
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer);
}
catch (ListenerExecutionFailedException ex) {
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
} catch (Throwable e) {
}
}
} catch (Exception e) {
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
this.start.countDown();
if (!isActive(this.consumer) || aborted) {
this.consumer.stop();
}
else {
restart(this.consumer);
}
}
复制代码
函数#receiveAndExecute最终的做用就是调用RocketBlockingQueueConsumer的nextMessage,而后再将消息调用messageListener.onMessage函数将消息传递出去。
SimpleRocketMessageListenerContainer的doStart函数会初始化RocketBlockingQueueConsumer而且启动SimpleRocketMessageListenerContainer的AsyncMessageProcessingConsumer会无限循环地从RocketBlockingQueueConsumer中获取RocketMQ传递过来的消息。
private void doStart() {
synchronized (this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
//初始化Consumer
int newConsumers = initializeConsumers();
if (this.consumers == null) {
return;
}
if (newConsumers <= 0) {
return;
}
Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
new HashSet<>();
//对于每一个RocketBlockingQueueConsumer启动一个
//AsyncMessageProcessingConsumer来执行任务
for (RocketBlockingQueueConsumer consumer : this.consumers) {
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
}
}
}
复制代码
RocketInboundChannelAdapter实现了MessageProducer接口。它主要将SimpleRocketMessageListenerContainer传递过来的消息通过MessageTemplate传递给MessageChannel。
接下来则是RocketInboundChannelAdapter.Listener的实现,它就是RocketBlockingQueueConsumer.nextMessage函数中的messageListener。
public class Listener implements ChannelAwareMessageListener, RetryListener {
public void onMessage(Message message, Channel channel) throws Exception {
try {
this.createAndSend(message, channel);
} catch (RuntimeException var7) {
if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
throw var7;
}
RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
}
}
private void createAndSend(Message message, Channel channel) {
org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
}
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
return messagingMessage;
}
}
复制代码
RocketProvisioningProvider实现了ProvisioningProvider接口,它有两个函数:provisionProducerDestination和provisionConsumerDestination,分别用于建立ProducerDestination和ConsumerDestination。RocketProvisioningProvider的实现相似于RabbitProvisioningProvider。只不过在声明队列,交换器和绑定时使用了RocketAdmin所实现的RocketMQ的相关API。
本文概要介绍了Spring Cloud Stream的Rocketmq绑定器的实现,限于篇幅不展开具体的代码讲解。读者感兴趣,能够关注GitHub上的代码。根据Spring Cloud Stream抽象的接口,咱们能够自由地实现各类消息队列的绑定器。
项目GitHub地址:https://github.com/ztelur/spring-cloud-stream-binder-rocket 推荐阅读:Spring Cloud Stream应用与自定义RocketMQ Binder:编程模型