Kafka(六)Kafka基本客户端命令操做


主题管理web

建立主题
算法

若是配置了auto.create.topics.enable=true(这也是默认值)这样当生产者向一个没有建立的主题发送消息就会自动建立,其分区数量和副本数量也是有默认配置来控制的。json

# 咱们这里建立一个3个分区每一个分区有2个副本的主题
kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create 表示创建
--zookeeper

表示ZK地址,能够传递多个,用逗号分隔bootstrap

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka
bash

--replication-factor 表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量
--partitions 表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每一个线程处理一个分区,分区越多吞吐量就会越大,可是分区越多也意味着须要打开更多的文件句柄数量,这样也会带来一些开销。
--topic
表示主题名称

image.png

在Zookeeper中能够看到以下信息服务器

image.png

删除主题网络

删除有两种方式手动和自动多线程

  • 手动方式须要删除各个节点日志路径下的该主题全部分区,而且删除zookeeper上/brokers/topics和/config/topics下的对应主题节点ide

  • 自动删除就是经过脚原本完成,同时须要配置服务器配置文件中的delete.topic.enable=true,默认为false也就是说经过命令删除主题只会删除ZK中的节点,日志文件不会删除须要手动清理,若是配置为true,则会自动删除日志文件。工具

kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

image.png

下面的两句话就是说该主题标记为删除/admin/delete_topics节点下。实际数据没有影响由于该参数没有设置为true。

查看主题

# 列出全部主题
kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

image.png

下面是从ZK中看到的全部主题

image.png

# 查看全部主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

image.png

# 查看特定主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

image.png

Replicas:是AR列表,表示副本分布在哪些代理上,且该列表第一个元素就是Leader副本所在代理

ISR:该列表是显示已经同步的副本集合,这个列表的副本都是存活的

# 经过--describe 和 --under-replicated-partitions 能够查看正在同步的主题或者同步可能发生异常,
# 也就是ISR列表长度小于AR列表,若是一切正常则不会返回任何东西,也能够经过 --tipic 指定具体主题
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions
# 查看哪些主题创建时使用了单独的配置 
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

这里只有一个内部主题__comsumer_offsets使用了非配置文件中的设置

image.png


配置管理

所谓配置就是参数,好比修改主题的默认参数。

主题级别的

# 查看配置
kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

这里显示 Configs for topic 'BBB' are 表示它的配置有哪些,这里没有表示没有为该主题单独设置配置,都是使用的默认配置。

image.png

# 增长一个配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

image.png

若是修改的话仍是相同的命令,只是把值修改一下

image.png

# 删除配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

image.png

客户端级别

这个主要是设置流控

# 设置指定消费者的流控 --entity-name 是客户端在建立生产者或者消费者时是指定的client.id名称
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

image.png

下图为ZK中对应的信息

image.png


分区管理

分区平衡

Leader副本在集群中应该是均衡分布,由于Leader副本对外提供读写服务,尽量不让同一个主题的多个Leader副本在同一个代理上,可是随着时间推移好比故障转移等状况发送,Leader副本可能不均衡。有两种方式设置自动平衡,自动和手动。

自动就是在配置文件中增长 auto.leader.rebalance.enable true 若是该项为false,当某个节点故障恢复并从新上线后,它原来的Leader副本也不会转移回来,只是一个Follower副本。

手动就是经过命令来执行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分区迁移

当下线一个节点须要将该节点上的分区副本迁移到其余可用节点上,Kafka并不会自动进行分区迁移,若是不迁移就会致使某些主题数据丢失和不可用的状况。当增长新节点时,只有新建立的主题才会分配到新节点上,以前的主题分区不会自动分配到新节点上,由于老的分区在建立时AR列表中没有这个新节点。

image.png

上面2个主题,每一个主题3个分区,每一个分区3个副本,咱们假设如今代理2要下线,因此咱们要把代理2上的这两个主题的分区数据迁移出来。

# 1. 在KAFKA目录的config目录中创建topics-to-move.json文件
{
    "topics":[
        {
            "topic":"AAA"
        },
        {
            "topic":"BBB"
        }
    ],
    "version":1
}

# 2. 生成分区分配方案,只是生成一个方案信息而后输出
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

image.png

个命令的原理是从zookeeper中读取主题元数据信息及制定的有效代理,根据分区副本分配算法从新计算指定主题的分区副本分配方案。把【Proposed partition reassignment configuration】下面的分区方案保存到一个JSON文件中,partitions-reassignment.json 文件名无所谓。

# 3. 执行方案
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

image.png

# 4. 查看进度
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

image.png

查看结果,这里已经没有代理0了。

image.png

集群扩容

上面演示了节点下线的数据迁移,这里演示一下集群扩容的数据迁移。咱们仍是用上面两个主题,假设代理0又从新上线了。其实扩容就是上面的反向操做

# 1. 创建JSON文件
# 该文件和以前的相同


# 2. 生成方案并保存到一个JSON文件中
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

image.png

# 3. 数据迁移,这里经过--throttle作一个限流操做,若是数据过大会把网络堵塞。
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

image.png

查看进度和结果

image.png

增长分区

一般在须要提供吞吐量的时候咱们会增长分区,而后若是代理数量不扩大,同时生产者和消费者线程不增大,你扩展了分区也没有用。

image.png

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

image.png

增长副本

集群规模扩大而且想对全部主题或者指定主题提升可用性,那么能够增长原有主题的副本数量

image.png

上面是3个分区,每一个分区1个副本,咱们如今把每一个分区扩展为3个副本

# 1. 建立JSON文件 replica-extends.json 
{
	"version": 1,
	"partitions": [{
			"topic": "KafkaTest04",
			"partition": 0,
			"replicas": [0,1,2]
		},
		{
			"topic": "KafkaTest04",
			"partition": 1,
			"replicas": [0,1,2]
		},
		{
			"topic": "KafkaTest04",
			"partition": 2,
			"replicas": [0,1,2]
		}
	]
}
# 2. 执行分区副本从新分配命令
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

image.png

查看状态

image.png

查看结果

image.png


镜像操做

Kafka有一个镜像工具kafka-mirror-maker.sh,用于将一个集群数据同步到另一个集群中,这个很是有用,好比机房搬迁就须要进行数据同步。该工具的本质就是建立一个消费者,在源集群中须要迁移的主题消费数据,而后建立一个生产者,将消费的数据写入到目标集群中。

首先建立消费者配置文件mirror-consumer.properties(文件路径和名称是自定义的)

# 源kafka集群代理地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092
# 消费者组名
group.id=mirror

其次建立生产者配置文件mirror-producer.properties(文件路径和名称是自定义的)

# 目标kafka集群地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

运行镜像命令

# 经过 --whitelist 指定须要镜像的主题,经过  --blacklist 指定不须要镜像的主题
kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

因为镜像操做是启动一个生产者和消费者,因此数据同步完成后这个生产者和消费者并不会关闭,它会依然等待新数据,因此同步完成之后你须要本身查看,确认完成了则关闭生产者和消费者。

相关文章
相关标签/搜索