笔者在某次实践过程当中,搭建了一个Flink监控程序,监控wikipedia
编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展现出来,监控程序自己比较简单,只要在程序中指定好WikipediaEditsSource
源并配置好sink与kafka关联就能够,相似一个略微复杂版的wordcount
,按照网络上的教程,在实践的最后,开启zookeeper服务和kafka服务,接着用bootstrap
kafka-console-producer --topic wiki-result --broker-list localhost:9092
这条命令建立一个名为wiki-result
的topic
,而后运行监控程序,最后用缓存
kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result
启动消费者,就能够在终端窗口里观察到源源不断的wikipedia
数据网络
当笔者次日再次跑这个监控程序时,发现上次执行的命令ide
kafka-console-producer --topic wiki-result --broker-list localhost:9092
是生产者命令,然而此例中的生产者其实是Fink监控程序,那么原做者为什么使用kafka-console-producer
命令去建立topic而不是用kafka-topics
命令呢?命令行
kafka-console-producer --topic wiki-result --broker-list localhost:9092
命令是生产者指定topic,是否自动建立了topic呢?线程
笔者尝试把现有的topic:wiki-result
删掉,而后从新建立topic,提示以下,并无真正删除,为此笔者去查了下相关资料,将topic建立与删除的原理完全弄懂了。3d
在 Kafka 中,Topic 是一个存储消息的逻辑概念,不一样的topic在物理上来讲是分开存储的,能够有多个producer向他push消息,也能够有多个consumer去pull消息,每一个 Topic 能够划分多个分区,每一个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每一个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的惟一编号,Kafka 经过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。code
经过命令server
kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic
建立了1个名为test-topic
的topic,拥有1个分区,每一个分区分配2个副本。建立逻辑如图,总的来讲就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic建立命令,该命令会建立新的数据节点从而触发后台的建立逻辑。对象
命令行部分比较直白,无非就是一些基本校验,分配副本(尽量保证分区的副本平均分配到每一个broker上),把分配方案持久化到zookeeper的/brokers/topics/<topic>
节点下。
后台逻辑部分主要由controller
负责,controller
内部保存了不少信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会建立这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics
目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener
监听器一方面会更新controller的缓存信息(好比更新集群当前全部的topic列表以及更新新增topic的分区副本分配方案缓存等),另外一方面就是建立对应的分区及其副本对象并为每一个分区肯定leader副本及ISR。至此,整个topic的建立就完成了!
除了使用kafka-topics –create
建立topic外,还能够使用kafka-console-producer
发布消息时建立,kafka第一步先获取topic的leader信息,当发现不可用的时候,在去建立此topic。
Kafka 删除topic的命令:
kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic
然而此命令不能真正删除topic,只是在zookeeper的/admin/delete_topics
下建立一个临时节点。
Kafka controller
在启动的时候会注册对于Zookeeper节点/admin/delete_topics
的子节点变动监听器,并建立一个单独的线程,执行topic删除的操做,监听器捕获到删除时建立的临时节点,马上触发删除逻辑,查询test-topic是否正在被使用,根据其状态决定是否删除。
那么何时线程会真正删除此topic呢?只有当在server.properties
配置了delete.topic.enable=true
时并从新启动Kafka,此Topic才会被真正删除。
至此Topic的建立和删除原理已经清楚了,而对于在实践过程当中遇到的问题也清晰了。