Alpakka Kafka,反应式Kafka客户端

Alpakka Kafka 是一个要用于 Java 和 Scala 语言的开源的流感知和反应式集成数据线项目。它创建在 Akka Stream之上,提供了 DSL 来支持反应式和流式编程,内置回压功能。Akka Streams 是 Reactive Streams 和JDK 9+ java.util.concurrent.Flow 的兼容实现,可无缝地与其进行互操做。html

要使用 Alpakka Kafka,须要在你的项目添加以下依赖:java

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "1.0-RC2"

当前支持 kafka-clients 2.1.x 和 Akka Streams 2.5.21。react

快速开始

对Akka Streams或Kafka不熟的,可先查阅二者的官方文档:git

Alpakka Kafka 写的代码很是精致且简洁,也许你会一眼爱上它的美。github

object KafkaGetting extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  import system.dispatcher
  val config = system.settings.config

  val producerSettings = 
    ProducerSettings(config.getConfig("akka.kafka.producer"),
      new StringSerializer, new StringSerializer)

  val consumerSettings = 
    ConsumerSettings(config.getConfig("akka.kafka.consumer"), 
      new StringDeserializer, new StringDeserializer)

  val producerQueue = Source
    .queue[String](128, OverflowStrategy.fail)
    .map(str => new ProducerRecord[String, String]("test", str))
    .toMat(Producer.plainSink(producerSettings))(Keep.left)
    .run()

  val consumerControl = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("test"))
    .map(record => record.value())
    .toMat(Sink.foreach(value => println(value)))(Keep.left)
    .run()

  Source(1 to 10)
    .map(_.toString)
    .throttle(1, 2.seconds)
    .runForeach(message => producerQueue.offer(message))
    .onComplete(tryValue => println(s"producer send over, return $tryValue"))

  println("Press 'enter' key exit.")
  StdIn.readLine()
  producerQueue.complete()
  consumerControl.shutdown()
  system.terminate()
  Await.result(system.whenTerminated, 10.seconds)
}

上面的代码实现了一个完整的Kafka生产者、消费者数据处理流程,整个处理都是异步、非阻塞的。没有显示线程建立、没有相似 where(true) 这样的消费处理循环……接下来,让咱们分析下以上代码。apache

代码分析

producerSettings

Alpakka Kafka 使用ProducerSettings来封装建立Kafka生产者时须要的参数,它使用了 Typesafe Config 经过可配置的方式来构建生产者。编程

producerSettings 使用 "akka.kafka.producer" 部分的参数来构造 Kafka 生产者,如下是一个示例的 Typesafe Config 配置:json

akka.kafka.producer {
  # 同时可运行的send操做数量
  parallelism = 100

  # 调用 `KafkaProducer.close` 时等待关闭的时间
  close-timeout = 60s
  
  # 线程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定义 org.apache.kafka.clients.producer.ProducerConfig 属性须要的参数
  kafka-clients {
    # 使用英文逗号分隔多个Kafka服务地址
    bootstrap.servers = "localhost:9092"
  }
}

consumerSettingsbootstrap

consumerSettings 使用 "akka.kafka.consumer" 部分的参数来构造 Kafka 消费者,如下是一个示例的 Typesafe Config 配置:api

akka.kafka.consumer {
  # 拉取数据间隔周期
  poll-interval = 50ms
  
  # 拉取数据超时时间
  poll-timeout = 30s

  # 调用 `KafkaConsumer.close` 时等待关闭的时间
  close-timeout = 20s
  
  # 线程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定义 org.apache.kafka.clients.producer.ProducerConfig 属性须要的参数
  kafka-clients {
    # 使用英文逗号分隔多个Kafka服务地址
    bootstrap.servers = "localhost:9092"

    # 自动commit消息
    enable.auto.commit = true

    # 消费者组ID
    group.id = "resource-dev"

    # 从最新的offset开始读取消息,不然从头开始读取
    auto.offset.reset = "earliest"
  }
}

producerQueue

使用Akka Streams构造一个生产者队列 producerQueue,再由 Producer.plainSink 来消费发送到 producerQueue 里的消息。须要注意的是构造 Source.queue[String] 时设置的 128 这个参数并非 Kafka 的消息队列容量,而是 Akka Streams Source 构造出来的一个Queue。Producer.plainSink 是一个 下游 ,它消费来自 producerQueue 这个上游的消息,再将数据发送到 Kafka 服务。

consumerControl

经过 Consumer 这个Akka Streams Source构造了一个Kafka消费者,并监听指定的 "test" 主题。consumerControl 流首先从收到的每一个消息(ConsumerRecord)中取得 value,并发送到下游,下游经过 Sink.foreach 接收数据并打印到终端。

Source(1 to 10)

生成从1到10的字符串消息值,并每隔2秒经过 producerQueue 发送一个消息到Kafka。

小结

本文经过一个简单的示例展示怎样经过 Alpakka Kafka 来实现对 Kafka 的集成,完成的代码示例见:https://github.com/ihongka/akka-fusion/blob/master/fusion-kafka/src/test/scala/fusion/kafka/getting/KafkaGetting.scala

Kafka发展到如今,已不仅仅再是一个消息系统了,在MQ以外,它还提供了KSQL和Connector特性。应用基于 Kafka 能够有更多的设计和实现,而Akka Stream + Kafka是一个强大的组合,接下来我会写一系列文章介绍怎样使用 Alpakka Kafka 来基于 Kafka 进行应用和架构设计。

相关文章
相关标签/搜索