第一次的Kafka实践

工做也有三年了,也是由于比较懒惰和自身水平的缘由,以前对一些没用过的技术进行实践后总没有过什么过多的总结。此次也是由于在工做之中遇到的一些问题须要使用到Kafka,过程当中也遇到并解决了一些问题。在阶段性完成工做后,觉得同事说起是否有什么总结和经验能够分享,使我忽然意识到须要总结一下本身遇到问题与技术,从而更好地去认识这陌生的01世界。java

1、kafka简介

Kafka是一个分布式的消息队列,有所耳闻的基本都了解它是个什么样的“人物”。简要的介绍一下我对它的一些认识。apache

  1. 基于Scala和Java编写,利用JVM运行;
  2. 队列在Kafka中称为Topic,每一个Topic可定义replication(副本)和partition(分片)信息;
  3. Consumer(消费者)有Group(组)概念,各组之间的消费状况互不影响;

2、Kafka服务安装

了解了Kafka一些很基础的概念后,仍是须要动手去体验它是如何操做的。下面是在CentOS7的一些安装过程。bootstrap

1. 服务文件下载

可在Kafka的官网http://kafka.apache.org/下载最新的资源文件缓存

2. 修改配置

目前仅使用单机版本的Kafka来提供服务,对于文件解压等基础操做不在阐述。主要须要修改的配置文件有:server.properties,配置文件kafka目录下的config文件夹下。网络

修改内容以下:
#    listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers.
advertised.listeners=PLAINTEXT://:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

简单的说明socket

  • listeners 配置服务监控的socket的地址
  • advertised.listeners 配置节点通知producer和consumer的hostname和port
  • log.dirs 配置消息数据存储的目录

3.启动服务

这里我使用的是单机模式,并非集群的运行方式。Kafka使用的是1.1.0的版本,内部自带了Zookeeper那么就不须要额外的下载Zookeeper了。分布式

1)首先启动Zookeeper服务性能

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log &

2)启动Kafka服务学习

nohup ./bin/kafka-server-start.sh config/server.properties > kafka.log &

简单提一下,nohup和&是为了让Zookeeper和Kafka的服务可以在后台运行,并将执行命令输出的日志分别记录到对应文件中去;.net

4.队列建立与查看

咱们使用kafka-topics.sh这个脚原本进行一系列关于Topics(队列)的定义等操做
1)创建一个队列

./bin/kafka-topics.sh --create --zookeeper 192.168.3.203:2181 --topic test --replication-factor 1 --partitions 2
### 进行一些说明
## --create 指定命令为建立
## --zookeeper 指定Zookeeper的服务
## --topic 指定队列名称
## --replication-factor 指定队列的副本数,这个副本数须要和集群的节点数相对应
## --partitions 指定队列的分片数量,适当的分片能够提高性能

2)查看队列列表

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --list
### 说明
## --list list命令指定列出当前的topic列表

3)查看指定队列

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --describe
### 说明
## --topic 指定须要查看的队列名
## --describe 该命令用于描述topic队应的基础信息

4)删除队列

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --delete
### 说明
## --topic 指定须要删除的队列名
## --delete 该命令用于删除队列

其余的一些使用方式,能够查看kafka-topics的帮助说明;

5.消费者信息查看

咱们使用kafka-consumer-groups.sh这个脚原本了解关于consumer的一些基础信息;
1)查看消费者组列表

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --list
### 说明
## --bootstrap-server 指定Kafka的服务地址 
## --list 列出全部消费者

2)查看消费者组信息

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --describe
### 说明
## --group 指定消费者组组名
## --describe 指定为描述命令

3)删除消费者组

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --delete
### 说明
## --delete 指定为删除命令

3、Kafka客户端实践

1.Producer数据发送

在Kafka的官方API中Producer有以下的使用例子

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

    producer.close();

还有以下针对,发送数据时使用事务去提交的例子:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

    producer.initTransactions();

    try {
        producer.beginTransaction();
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // We can't recover from these exceptions, so our only option is to close the producer and exit.
        producer.close();
    } catch (KafkaException e) {
        // For all other exceptions, just abort the transaction and try again.
        producer.abortTransaction();
    }
    producer.close();

在我的使用中出现过以下的两个问题,但愿你们也能规避;

1.频繁发送数据致使的connection建立过多

  每次新建一个Producer时,会利用网络去创建一个链接。若是使用线程频繁地去建立Producer的话,在Client端会出现java.net.BindException: Address already in use: JVM_Bind的异常。
  这是因为在Producer与Kafka Server创建链接时,须要在客户端分配一个端口,每一个机器的端口数量有限,当被耗尽时,变没法在与Kafka Server创建链接了。

2.使用事务发送数据的事务ID不能重复

  当producer使用事务进行数据发送时,事务结束以后,须要开启一个新的事务进行数据提交。这个时候的事务ID不能重复,须要从新设置一个transactional.id,不然进行数据提交时将会出现异常。

2.Consumer数据处理

一样Consumer在官方的API中也有使用说明,大体使用以下:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }
    }

在我的使用中也遇到一些问题,拿出来与你们分享一下;

1.offset设置,获取数据

  offset对应的是一个消费组的偏移量,按照如上的Consumer的代码获取使用脚本一个新建立Topic的数据,将获取不到任何数据。不管使用Producer往Topic中发送了对多数据,我都没有办法取到。
  这使我有点困惑,起初我使用命令进行数据拉取后,改代码能够拉取到数据,命令以下:

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.203:9092 --from-beginning --topic test

因而,经过查看kafka-console.consumer.sh,了解到它在Kafka包中对用的Scala代码中,发现参数--from-beginning对应设置了一个配置,"auto.offset.reset"为"earliest"。
auto.offset.reset参数说明以下:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

设置配置后,即可以在未初始化时获取到数据了。

2.查看队列的数量的方法

  当咱们使用到队列做为中间件缓存消息时,咱们经常会关心当前队列中剩余的数据量。这便于咱们去判断当前咱们的数据处理程序的响应能力,以便于调整部署状况。
  当前基于个人了解,Kafka没有提供直接的方法返回队列中的数据剩余量,须要咱们手工的去计算。我是这样作的:

List<PartitionInfo> partInfos = consumer.partitionsFor(topic);
     List<TopicPartition> partitions = new ArrayList<TopicPartition>();
     for (PartitionInfo info : partInfos) {
           partitions.add(new TopicPartition(topic, info.partition()));
     }
            
     Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
     for (TopicPartition topicPartitionKey : endOffsets.keySet()) {
         count += (endOffsets.get(topicPartitionKey) - consumer.position(topicPartitionKey));
     }
     return count;

遍历消费者在Topic中的每个分片,在每一个分片中对offset - position的差值作累加获得最终的未处理数量。这样作只能获取一个大概的计算值,不是十分准确。由于在Producer使用transition发送数据时,完成一个事务后offset会加1去作记录,并且可能还存在一些未知的状况。
  固然这个方法是一种很笨的方法,或许你有更好的方式呢,欢迎交流。

最后,我目前使用的Kafka的经验也总结完,Kafka内部设计很复杂也颇有趣,须要进一步地探索,学习总会有回报的。

相关文章
相关标签/搜索