应用间通讯

模式阅读
图12. Schema阅读决议程序
注意

了解编写器架构(写入消息的应用程序)和读取器架构(接收应用程序)之间的区别很重要。请花点时间阅读Avro术语并了解此过程。Spring Cloud Stream将始终提取writer模式以肯定如何读取消息。若是您想要Avro的架构演进支持工做,您须要确保为您的应用程序正确设置了readerSchema。html

应用间通讯

链接多个应用程序实例

虽然Spring Cloud Stream使我的Spring Boot应用程序轻松链接到消息传递系统,可是Spring Cloud Stream的典型场景是建立多应用程序管道,其中微服务应用程序将数据发送给彼此。您能够经过将相邻应用程序的输入和输出目标相关联来实现此场景。java

假设设计要求时间源应用程序将数据发送到日志接收应用程序,则能够在两个应用程序中使用名为ticktock的公共目标进行绑定。web

时间来源(具备频道名称output)将设置如下属性:spring

spring.cloud.stream.bindings.output.destination=ticktock

日志接收器(通道名称为input)将设置如下属性:apache

spring.cloud.stream.bindings.input.destination=ticktock

实例索引和实例计数

当扩展Spring Cloud Stream应用程序时,每一个实例均可以接收有关同一个应用程序的其余实例数量以及本身的实例索引的信息。Spring Cloud Stream经过spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性执行此操做。例如,若是HDFS宿应用程序有三个实例,则全部三个实例将spring.cloud.stream.instanceCount设置为3,而且各个应用程序将spring.cloud.stream.instanceIndex设置为012架构

当经过Spring Cloud数据流部署Spring Cloud Stream应用程序时,这些属性将自动配置; 当Spring Cloud Stream应用程序独立启动时,必须正确设置这些属性。默认状况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0app

在放大的状况下,这两个属性的正确配置对于解决分区行为(见下文)通常很重要,而且某些绑定器(例如,Kafka binder)老是须要这两个属性,以确保该数据在多个消费者实例之间正确分割。微服务

分区

配置输出绑定进行分区

输出绑定被配置为经过设置其惟一的一个partitionKeyExpressionpartitionKeyExtractorClass属性以及其partitionCount属性来发送分区数据。例如,如下是一个有效和典型的配置:测试

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

基于上述示例配置,使用如下逻辑将数据发送到目标分区。spa

基于partitionKeyExpression,为发送到分区输出通道的每一个消息计算分区密钥的值。partitionKeyExpression是一个Spel表达式,它根据出站消息进行评估,以提取分区键。

若是SpEL表达式不足以知足您的须要,您能够经过将属性partitionKeyExtractorClass设置为实现org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy接口的类来计算分区键值。虽然Spel表达式一般足够,但更复杂的状况可能会使用自定义实现策略。在这种状况下,属性“partitionKeyExtractorClass”能够设置以下:

spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=com.example.MyKeyExtractor
spring.cloud.stream.bindings.output.producer.partitionCount=5

一旦计算了消息密钥,分区选择过程将肯定目标分区为0partitionCount - 1之间的值。在大多数状况下,默认计算基于公式key.hashCode() % partitionCount这能够经过设置要针对'key'(经过partitionSelectorExpression属性)进行评估的Spel表达式或经过设置org.springframework.cloud.stream.binder.PartitionSelectorStrategy实现(经过partitionSelectorClass属性))进行自定义。

“partitionSelectorExpression”和“partitionSelectorClass”的绑定级属性能够相似于上述示例中指定的“partitionKeyExpression”和“partitionKeyExtractorClass”属性的类型。能够为更高级的场景配置其余属性,如如下部分所述。

Spring - 管理的自定义PartitionKeyExtractorClass实现

在上面的示例中,MyKeyExtractor之类的自定义策略由Spring Cloud Stream直接实例化。在某些状况下,必须将这样的自定义策略实现建立为Spring bean,以便可以由Spring管理,以便它能够执行依赖注入,属性绑定等。能够经过将其配置为应用程序上下文中的@Bean,并使用彻底限定类名做为bean的名称,如如下示例所示。

@Bean(name="com.example.MyKeyExtractor")
public MyKeyExtractor extractor() {
    return new MyKeyExtractor();
}

做为Spring bean,自定义策略从Spring bean的完整生命周期中受益。例如,若是实现须要直接访问应用程序上下文,则能够实现“ApplicationContextAware”。

配置输入绑定进行分区

输入绑定(通道名称为input)被配置为经过在应用程序自己设置其partitioned属性以及instanceIndexinstanceCount属性来接收分区数据,如如下示例:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount值表示数据须要分区的应用程序实例的总数,instanceIndex必须是0instanceCount - 1之间的多个实例的惟一值。实例索引帮助每一个应用程序实例识别从其接收数据的惟一分区(或者在Kafka的分区集合的状况下)。重要的是正确设置两个值,以确保全部数据都被使用,而且应用程序实例接收到互斥数据集。

虽然使用多个实例进行分区数据处理的场景可能会在独立状况下进行复杂化,可是经过将输入和输出值正确填充并依赖于运行时基础架构,Spring Cloud数据流能够显着简化流程。提供有关实例索引和实例计数的信息。

测试

Spring Cloud Stream支持测试您的微服务应用程序,而无需链接到消息系统。您可使用spring-cloud-stream-test-support库提供的TestSupportBinder,能够将其做为测试依赖项添加到应用程序中:

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-test-support</artifactId>
       <scope>test</scope>
   </dependency>
注意

TestSupportBinder使用Spring Boot自动配置机制取代类路径中找到的其余绑定。所以,添加binder做为依赖关系时,请确保正在使用test范围。

TestSupportBinder容许用户与绑定的频道进行交互,并检查应用程序发送和接收的消息

对于出站消息通道,TestSupportBinder注册单个订户,并将应用程序发送的消息保留在MessageCollector中。它们能够在测试过程当中被检索,并对它们作出断言。

用户还能够将消息发送到入站消息通道,以便消费者应用程序可使用消息。如下示例显示了如何在处理器上测试输入和输出通道。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ExampleTest {

  @Autowired
  private Processor processor;

  @Autowired
  private MessageCollector messageCollector;

  @Test
  @SuppressWarnings("unchecked")
  public void testWiring() {
    Message<String> message = new GenericMessage<>("hello");
    processor.input().send(message);
    Message<String> received = (Message<String>) messageCollector.forChannel(processor.output()).poll();
    assertThat(received.getPayload(), equalTo("hello world"));
  }


  @SpringBootApplication
  @EnableBinding(Processor.class)
  public static class MyProcessor {

    @Autowired
    private Processor channels;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String transform(String in) {
      return in + " world";
    }
  }
}

在上面的示例中,咱们正在建立一个具备输入和输出通道的应用程序,经过Processor接口绑定。绑定的接口被注入测试,因此咱们能够访问这两个通道。咱们正在输入频道发送消息,咱们使用Spring Cloud Stream测试支持提供的MessageCollector来捕获消息已经被发​​送到输出通道。收到消息后,咱们能够验证组件是否正常工做。

健康指标

Spring Cloud Stream为粘合剂提供健康指标。它以binders的名义注册,能够经过设置management.health.binders.enabled属性启用或禁用。

相关文章
相关标签/搜索