「Spring和Kafka」Kafka整合Spring 深刻挖掘第2部分:Kafka和Spring

在这个博客系列的第1部分以后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持「Spring和Kafka」Kafka整合Spring 深刻挖掘 -第1部分,在这里的第2部分中,咱们将关注另外一个加强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。spring

咱们将在这篇文章中讨论如下内容:编程

  • Spring云流及其编程模型概述数组

  • Apache Kafka®集成在Spring云流安全

  • Spring Cloud Stream如何让Kafka开发人员更轻松地开发应用程序服务器

  • 使用Kafka流和Spring云流进行流处理app

让咱们首先看看什么是Spring Cloud Stream,以及它如何与Apache Kafka一块儿工做。框架

什么是Spring Cloud Stream?

Spring Cloud Stream是一个框架,它容许应用程序开发人员编写消息驱动的微服务。这是经过使用Spring Boot提供的基础来实现的,同时还支持其余Spring组合项目(如Spring Integration、Spring Cloud函数和Project Reactor)公开的编程模型和范例。它支持使用描述输入和输出组件的类型安全编程模型编写应用程序。应用程序的常见示例包括源(生产者)、接收(消费者)和处理器(生产者和消费者)。ide

典型的Spring cloud stream 应用程序包括用于通讯的输入和输出组件。这些输入和输出被映射到Kafka主题。Spring cloud stream应用程序能够接收来自Kafka主题的输入数据,它能够选择生成另外一个Kafka主题的输出。这些与Kafka链接接收器和源不一样。有关各类Spring Cloud流开箱即用应用程序的更多信息,请访问项目页面。函数

消息传递系统和Spring cloud stream之间的桥梁是经过绑定器抽象实现的。绑定器适用于多个消息传递系统,但最经常使用的绑定器之一适用于Apache Kafka。微服务

Kafka绑定器扩展了Spring Boot、Apache Kafka的Spring和Spring集成的坚实基础。因为绑定器是一个抽象,因此其余消息传递系统也有可用的实现。

Spring Cloud Stream支持发布/订阅语义、消费者组和本机分区,并尽量将这些职责委派给消息传递系统。对于Kafka绑定器,这些概念在内部映射并委托给Kafka,由于Kafka自己就支持它们。当消息传递系统自己不支持这些概念时,Spring Cloud Stream将它们做为核心特性提供。

如下是绑定器抽象如何与输入和输出工做的图示:

图片

使用Spring Cloud Stream建立Kafka应用程序

Spring Initializr是使用Spring Cloud Stream建立新应用程序的最佳场所。这篇博文介绍了如何在Spring启动应用程序中使用Apache Kafka,涵盖了从Spring Initializr建立应用程序所需的全部步骤。对于Spring Cloud Stream,唯一的区别是您须要“Cloud Stream”和“Kafka”做为组件。如下是你须要选择的一个例子:

initializr包含开发流应用程序所需的全部依赖项。经过使用Initializr,您还能够选择构建工具(如Maven或Gradle)和目标JVM语言(如Java或Kotlin)。

该构建将生成一个可以做为独立应用程序(例如,从命令行)运行的uber JAR。

Apache Kafka的Spring cloud stream编程模型

Spring Cloud Stream提供了一个编程模型,支持与Apache Kafka的即时链接。应用程序须要在其类路径中包含Kafka绑定,并添加一个名为@EnableBinding的注释,该注释将Kafka主题绑定到它的输入或输出(或二者)。

Spring Cloud Stream提供了三个与@EnableBinding绑定的方便接口:Source(单个输出)、Sink(单个输入)和Processor(单个输入和输出)。它还能够扩展到具备多个输入和输出的自定义接口。

下面的代码片断展现了Spring Cloud Stream的基本编程模型:

@SpringBootApplication
@EnableBinding(Processor.class)
public class UppercaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String process(String s) {
     return s.toUpperCase();
  }
}

在这个应用程序中,注意这个方法是用@StreamListener注释的,它是由Spring Cloud Stream提供的,用于接收来自Kafka主题的消息。一样的方法也使用SendTo进行注释,SendTo是将消息发送到输出目的地的方便注释。这是一个Spring云流处理器应用程序,它使用来自输入的消息并将消息生成到输出。

在前面的代码中没有提到Kafka主题。此时可能出现的一个天然问题是,“这个应用程序如何与Kafka通讯?”答案是:入站和出站主题是经过使用Spring Boot支持的许多配置选项之一来配置的。在本例中,咱们使用一个名为application的YAML配置文件。yml,它是默认搜索的。下面是输入和输出目的地的配置:

spring.cloud.stream.bindings:
  input:
    destination: topic1
  output:
    destination: topic2

 

Spring Cloud Stream将输入映射到topic1,将输出映射到topic2。这是一组很是少的配置,可是可使用更多的选项来进一步定制应用程序。默认状况下,主题是用单个分区建立的,可是能够由应用程序覆盖。更多信息请参考这些文档。

最重要的是,开发人员能够简单地专一于编写核心业务逻辑,让Spring Cloud Stream和Spring Boot来处理基础设施问题(好比链接到Kafka、配置和调优应用程序等等)。

下面的例子展现了另外一个简单的应用程序(消费者):

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

  @StreamListener(Sink.INPUT)
  public void handle(Person person) {
     System.out.println("Received: " + person);
  }

  public static class Person {
     private String name;
     public String getName() {
        return name;
     }
     public void setName(String name) {
        this.name = name;
     }
     public String toString() {
        return this.name;
     }
  }
}

 

注意,@EnableBinding提供了一个接收器,这代表这是一个消费者。与前一个应用程序的一个主要区别是,使用@StreamListener注释的方法将一个名为Person的POJO做为参数,而不是字符串。来自Kafka主题的消息是如何转换成这个POJO的?Spring Cloud Stream提供了自动的内容类型转换。默认状况下,它使用application/JSON做为内容类型,但也支持其余内容类型。您能够经过使用属性spring.cloud.stream.binding .input来提供内容类型。而后将其设置为适当的内容类型,如application/Avro。

适当的消息转换器由Spring Cloud Stream根据这个配置来选择。若是应用程序但愿使用Kafka提供的本地序列化和反序列化,而不是使用Spring Cloud Stream提供的消息转换器,那么能够设置如下属性。

序列化:

spring.cloud.stream.bindings.output.useNativeEncoding=true

反序列化:

spring.cloud.stream.bindings.input.useNativeDecoding=true

Auto-provisioning of topic

Apache Kafka绑定器提供了一个在启动时配置主题的配置程序。若是在代理上启用了主题建立,Spring Cloud Stream应用程序能够在应用程序启动时建立和配置Kafka主题。

例如,能够向供应者提供分区和其余主题级配置。这些定制能够在绑定器级别进行,绑定器级别将应用于应用程序中使用的全部主题,也能够在单独的生产者和消费者级别进行。这很是方便,特别是在应用程序的开发和测试期间。有许多关于如何为多个分区配置主题的示例。

支持使用者组和分区

可使用Spring Cloud Stream配置众所周知的属性,如用户组和分区。消费者组能够经过属性设置:

spring.cloud.stream.bindings.input.group =组名称

如前所述,在内部,这个组将被翻译成Kafka的消费者组。

在编写生产者应用程序时,Spring Cloud Stream提供了将数据发送到特定分区的选项。一样,在内部,框架将这些职责委托给Kafka。

对于使用者,若是禁用自动再平衡(这是一个须要覆盖的简单配置属性),则特定的应用程序实例能够限制为使用来自一组特定分区的消息。有关详细信息,请参阅这些配置选项。

绑定可视化和控制

经过使用Spring Boot的致动器机制,咱们如今可以控制Spring cloud stream中的各个绑定。

在运行时,可使用执行器端点来中止、暂停、恢复等,执行器端点是Spring Boot的机制,用于在将应用程序推向生产环境时监视和管理应用程序。该特性使用户可以对应用程序处理来自Kafka的数据的方式有更多的控制。若是应用程序因绑定而暂停,那么来自该特定主题的处理记录将暂停,直到恢复。

Spring Cloud Stream还集成了Micrometer,以启用更丰富的指标、发出混乱的速率并提供其余与监视相关的功能。这些系统能够与许多其余监测系统进一步集成。Kafka绑定器提供了扩展的度量功能,为主题的消费者滞后提供了额外的看法。

Spring Boot经过一个特殊的健康情况端点提供应用程序健康情况检查。Kafka绑定器提供了一个健康指示器的特殊实现,它考虑到代理的链接性,并检查全部的分区是否都是健康的。若是发现任何分区没有leader,或者代理没法链接,那么health check将报告相应的状态。

Kafka流在Spring cloud stream中的支持概述

在编写流处理应用程序时,Spring Cloud stream提供了另外一个专门用于Kafka流的绑定器。与常规的Kafka绑定器同样,Kafka Streams绑定器也关注开发人员的生产力,所以开发人员能够专一于为KStream、KTable、GlobalKTable等编写业务逻辑,而不是编写基础结构代码。绑定器负责链接到Kafka,以及建立、配置和维护流和主题。例如,若是应用程序方法具备KStream签名,则绑定器将链接到目标主题,并在后台从该主题生成流。应用程序开发人员没必要显式地这样作,由于绑定器已经为应用程序提供了绑定。

其余类型(如KTable和GlobalKTable)也是如此。底层的KafkaStreams对象由绑定器提供,用于依赖注入,所以,应用程序不直接维护它。更确切地说,它是由春天的云流为你作的。

要使用Spring Cloud Stream开始Kafka流,请转到Spring Initializr并选择以下图所示的选项,以生成一个应用程序,该应用程序带有使用Spring Cloud Stream编写Kafka流应用程序的依赖项:

上面的例子展现了一个用Spring Cloud Stream编写的Kafka Streams应用程序:

@SpringBootApplication
public class KafkaStreamsTableJoin {

  @EnableBinding(StreamTableProcessor.class)
  public static class KStreamToTableJoinApplication {

     @StreamListener
     @SendTo("output")
     public KStream<String, Long> process(@Input("input1") KStream<String, Long> userClicksStream,
                                 @Input("input2") KTable<String, String> userRegionsTable) {

        return userClicksStream
              .leftJoin(userRegionsTable,
                    (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
              .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
              .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
              .toStream();
     }
  }

  interface StreamTableProcessor {

     @Input("input1")
     KStream inputStream();

     @Output("output")
     KStreamoutputStream();

     @Input("input2")
     KTable inputTable();
  }
}

在前面的代码中有几件事情须要注意。在@StreamListener方法中,没有用于设置Kafka流组件的代码。应用程序不须要构建流拓扑,以便将KStream或KTable与Kafka主题关联起来,启动和中止流,等等。全部这些机制都是由Kafka流的Spring Cloud Stream binder处理的。在调用该方法时,已经建立了一个KStream和一个KTable供应用程序使用。

应用程序建立一个名为StreamTableProcessor的自定义接口,该接口指定用于输入和输出绑定的Kafka流类型。此接口与@EnableBinding一块儿使用。此接口的使用方式与咱们在前面的处理器和接收器接口示例中使用的方式相同。与常规的Kafka绑定器相似,Kafka上的目的地也是经过使用Spring云流属性指定的。您能够为前面的应用程序提供这些配置选项来建立必要的流和表:

spring.cloud.stream.bindings.input1.destination=userClicksTopic spring.cloud.stream.bindings.input2.destination=userRegionsTopic spring.cloud-stream.bindings.output.destination=userClickRegionsTopic

咱们使用两个Kafka主题来建立传入流:一个用于将消息消费为KStream,另外一个用于消费为KTable。框架根据自定义接口StreamTableProcessor中提供的绑定适当地使用所需的类型。而后,这些类型将与方法签名配对,以便在应用程序代码中使用。在出站时,出站的KStream被发送到输出Kafka主题。

Kafka流中可查询的状态存储支持

Kafka流为编写有状态应用程序提供了第一类原语。当使用Spring Cloud Stream和Kafka流构建有状态应用程序时,就有可能使用RESTful应用程序从RocksDB的持久状态存储中提取信息。下面是一个Spring REST应用程序的例子,它依赖于Kafka流中的状态存储:

@RestController
public class FooController {

  private final Log logger = LogFactory.getLog(getClass());

  @Autowired
  private InteractiveQueryService interactiveQueryService;

@RequestMapping("/song/id")
public SongBean song(@RequestParam(value="id") Long id) {

     final ReadOnlyKeyValueStore<Long, Song> songStore =
           interactiveQueryService.getQueryableStore(“STORE-NAME”,
 QueryableStoreTypes.<Long, Song>keyValueStore());

     final Song song = songStore.get(id);
     if (song == null) {
        throw new IllegalArgumentException("Song not found.");
     }
     return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
  }
}

 

InteractiveQueryService是Apache Kafka Streams绑定器提供的一个API,应用程序可使用它从状态存储中检索数据。应用程序可使用此服务按名称查询状态存储,而不是直接经过底层流基础设施访问状态存储。当Kafka Streams应用程序的多个实例运行时,该服务还提供了用户友好的方式来访问服务器主机信息,这些实例之间有分区。

一般在这种状况下,应用程序必须经过直接访问Kafka Streams API来找到密钥所在的分区所在的主机。InteractiveQueryService提供了这些API方法的包装器。一旦应用程序得到了对状态存储的访问权,它就能够经过查询来造成进一步的看法。最终,能够经过上面所示的REST端点来提供这些看法。您能够在GitHub上找到一个使用Spring Cloud Stream编写的Kafka Streams应用程序的示例,在这个示例中,它使用本节中提到的特性来适应Kafka音乐示例。

Branching in Kafka Streams

经过使用SendTo注释,能够在Spring Cloud流中原生地使用Kafka流的分支特性。

@StreamListener("input")
@SendTo({“englishTopic”, “frenchTopic”, “spanishTopic”})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {

  Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
  Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
  Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

  return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, value) -> value)
        .windowedBy(timeWindows)
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
        .branch(isEnglish, isFrench, isSpanish);
}

注意,SendTo注释有三个不一样输出的绑定,方法自己返回一个KStream[]。Spring Cloud Stream在内部将分支发送到输出绑定到的Kafka主题。观察SendTo注释中指定的输出顺序。这些输出绑定将与输出的KStream[]按其在数组中的顺序配对。

数组的第一个索引中的第一个KStream能够映射到englishTopic,而后将下一个映射到frenchTopic,以此类推。这里的想法是,应用程序能够专一于功能方面的事情,并使用Spring Cloud Stream设置全部这些输出流,不然开发人员将不得不为每一个流单独作这些工做。

Spring cloud stream中的错误处理

Spring Cloud Stream提供了错误处理机制来处理失败的消息。它们能够被发送到死信队列(DLQ),这是Spring Cloud Stream建立的一个特殊的Kafka主题。当失败的记录被发送到DLQ时,头信息被添加到记录中,其中包含关于失败的更多信息,如异常堆栈跟踪、消息等。

发送到DLQ是可选的,框架提供各类配置选项来定制它。

对于Spring Cloud Stream中的Kafka Streams应用程序,错误处理主要集中在反序列化错误上。Apache Kafka Streams绑定器提供了使用Kafka Streams提供的反序列化处理程序的能力。它还提供了在主流继续处理时将失败的记录发送到DLQ的能力。当应用程序须要返回来访问错误记录时,这是很是有用的。

模式演化和Confluent 模式注册

Spring Cloud Stream支持模式演化,它提供了与Confluent模式注册中心以及Spring Cloud Stream提供的本地模式注册中心服务器一块儿工做的功能。应用程序经过在应用程序级别上包含@EnableSchemaRegistryClient注释来启用模式注册表。Spring Cloud Stream提供了各类基于Avro的消息转换器,能够方便地与模式演化一块儿使用。在使用Confluent模式注册表时,Spring Cloud Stream提供了一个应用程序须要做为SchemaRegistryClient bean提供的特殊客户端实现(ConfluentSchemaRegistryClient)。

结论

Spring Cloud Stream经过自动处理其余同等重要的非功能需求(如供应、自动内容转换、错误处理、配置管理、用户组、分区、监视、健康检查等),使应用程序开发人员更容易关注业务逻辑,从而提升了使用Apache Kafka的生产率。

相关文章
相关标签/搜索