Kafka Python的生产者和消费者

Kafka Python的生产者和消费者

在本教程中,咱们将使用Python构建Kafka Producer和Consumer。除此以外,咱们还将学习如何在Kafka中设置配置以及如何使用组和偏移量概念。python

创建

对于本教程,咱们应该在计算机上安装python。另外,咱们须要访问在咱们的设备或某些服务器上运行的Apache Kafka。您能够检查如何在Windows上安装Apache Kafka。除此以外,咱们须要python的_kafka_ 库来运行咱们的代码。要解决此问题,请在系统上运行如下命令git

pip install kafkagithub

卡夫卡生产者

===web

让咱们开始建立本身的Kafka Producer。咱们必须从kafka库导入KafkaProducer。咱们还须要将Kafka服务器的代理列表提供给Producer,以便它能够链接到Kafka服务器。咱们还须要提供要向其发布消息的主题名称。这是建立生产者所需的最小配置。docker

from kafka import KafkaProducer

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
producer = KafkaProducer()

咱们能够使用如下代码开始向该主题发送消息。json

ack = producer.send(topicName, b'Hello World!!!!!!!!')

metadata = ack.get()
print(metadata.topic)
print(metadata.partition)

上面的代码将消息发送到Kafka服务器中名为“ myTopic”的主题。可是,若是该主题还没有出如今Kafka服务器中怎么办?在这种状况下,Kafka会使用该名称建立一个新主题并向其发布消息。方便吗?可是您应该记住要检查主题名称中是否存在拼写错误。bootstrap

若是要为Producer设置更多属性或更改其序列化格式,则能够使用如下代码行。windows

producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))

卡夫卡消费者

完成建立Producer的工做后,如今让咱们开始使用python构建Consumer,看看这是否一样容易。导入KafkaConsumer后,咱们须要设置提供引导服务器ID和主题名称,以与Kafka服务器创建链接。服务器

from kafka import KafkaConsumer
import sys

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest')

如咱们所见,咱们须要设置哪一个组消费者属于。另外,咱们须要指定偏移量,此使用者应该从该偏移量读取主题中的消息。在上述状况下,咱们最先指定了auto_offset_reset,这意味着此使用者将从主题的开头开始读取消息。app

以后,咱们能够开始阅读主题中的消息。与每条消息一块儿,咱们还得到了一些其余信息,例如消息所属的分区,在该分区中的偏移量和键。

try:
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

except KeyboardInterrupt:
    sys.exit()

这将以如下格式打印输出。

用Python编写的Kafka Consumer的输出

就是这个。咱们已经在python中建立了第一个Kafka使用者。咱们能够看到该使用者已经阅读了该主题的消息并将其打印在控制台上。

Docker 运行Kafka

使用的是 zerocode 提供的docker-compose配置文件。

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # -----------------------------------------------------------------------------
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # -----------------------------------------------------------------------------
    image: confluentinc/cp-kafka:5.0.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

注意这里的 PLAINTEXT_HOST://localhost:9092 使用的是localhost, 因此在容器外部访问是没有问题,若是须要的是容器之间的访问,即生产才和消费者也在容器里运行,则须要改为hostname(如 kafka).

结论

咱们已经学习了如何在python中建立Kafka生产者和消费者。

相关文章
相关标签/搜索