由Flink与Kafka实践探究Kafka的两个问题

笔者在某次实践过程当中,搭建了一个Flink监控程序,监控wikipedia编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展现出来,监控程序自己比较简单,只要在程序中指定好WikipediaEditsSource源并配置好sink与kafka关联就能够,相似一个略微复杂版的wordcount,按照网络上的教程,在实践的最后,开启zookeeper服务和kafka服务,接着用bootstrap

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

这条命令建立一个名为wiki-resulttopic,而后运行监控程序,最后用缓存

kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result

启动消费者,就能够在终端窗口里观察到源源不断的wikipedia数据网络

由Flink与Kafka实践探究Kafka的两个问题

当笔者次日再次跑这个监控程序时,发现上次执行的命令ide

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

是生产者命令,然而此例中的生产者其实是Fink监控程序,那么原做者为什么使用kafka-console-producer命令去建立topic而不是用kafka-topics命令呢?命令行

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

命令是生产者指定topic,是否自动建立了topic呢?线程

笔者尝试把现有的topic:wiki-result删掉,而后从新建立topic,提示以下,并无真正删除,为此笔者去查了下相关资料,将topic建立与删除的原理完全弄懂了。3d

由Flink与Kafka实践探究Kafka的两个问题

在 Kafka 中,Topic 是一个存储消息的逻辑概念,不一样的topic在物理上来讲是分开存储的,能够有多个producer向他push消息,也能够有多个consumer去pull消息,每一个 Topic 能够划分多个分区,每一个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每一个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的惟一编号,Kafka 经过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。code

由Flink与Kafka实践探究Kafka的两个问题

经过命令server

kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic

建立了1个名为test-topic的topic,拥有1个分区,每一个分区分配2个副本。建立逻辑如图,总的来讲就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic建立命令,该命令会建立新的数据节点从而触发后台的建立逻辑。对象

由Flink与Kafka实践探究Kafka的两个问题

命令行部分比较直白,无非就是一些基本校验,分配副本(尽量保证分区的副本平均分配到每一个broker上),把分配方案持久化到zookeeper的/brokers/topics/<topic>节点下。

后台逻辑部分主要由controller负责,controller内部保存了不少信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会建立这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener监听器一方面会更新controller的缓存信息(好比更新集群当前全部的topic列表以及更新新增topic的分区副本分配方案缓存等),另外一方面就是建立对应的分区及其副本对象并为每一个分区肯定leader副本及ISR。至此,整个topic的建立就完成了!

除了使用kafka-topics –create建立topic外,还能够使用kafka-console-producer发布消息时建立,kafka第一步先获取topic的leader信息,当发现不可用的时候,在去建立此topic。

Kafka 删除topic的命令:

kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic

然而此命令不能真正删除topic,只是在zookeeper的/admin/delete_topics下建立一个临时节点。

Kafka controller在启动的时候会注册对于Zookeeper节点/admin/delete_topics的子节点变动监听器,并建立一个单独的线程,执行topic删除的操做,监听器捕获到删除时建立的临时节点,马上触发删除逻辑,查询test-topic是否正在被使用,根据其状态决定是否删除。

那么何时线程会真正删除此topic呢?只有当在server.properties配置了delete.topic.enable=true时并从新启动Kafka,此Topic才会被真正删除。

至此Topic的建立和删除原理已经清楚了,而对于在实践过程当中遇到的问题也清晰了。

相关文章
相关标签/搜索