3.kafka 基本配置

1.主题管理ide

kafka-topics.sh工具脚本用于对主题操做,如建立、删除、修改、分区数、副本数及主题级别的配置。工具

1.1建立名为kafka-test主题,有2个副本,3个分区oop

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic kafka-test测试

输出:spa

Created topic "kafka-test".orm

####server

--zookeeper 参数必传,用于与zookeeper链接hadoop

--partitions 参数必传,用于设置分区数ci

--replication-factor 参数必传,用于设置副本数get

####

 

1.2进入log目录

[hadoop@h201 kafka_2.12-0.10.2.1]$ cd kafkalogs/

[hadoop@h201 kafkalogs]$ ls

kafka-test-1kafka-test-0

##副本分布到不一样的节点上###

h202上

[hadoop@h202 kafkalogs]$ ls

kafka-test-2kafka-test-1

 

[hadoop@h201 zookeeper-3.4.5-cdh5.5.2]$ bin/zkCli.sh 

[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/kafka-test/partitions

结果:

[0, 1, 2]

[zk: localhost:2181(CONNECTED) 2] get /brokers/topics/kafka-test

 

1.3 建立主题时,能够经过config参数设置主题级别以覆盖默认配置

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic kafka-test1 --config max.message.bytes=404800

 

[hadoop@h201 zookeeper-3.4.5-cdh5.5.2]$ bin/zkCli.sh 

[zk: localhost:2181(CONNECTED) 1] get /config/topics/kafka-test1

结果:

{"version":1,"config":{"max.message.bytes":"404800"}}

 

2.删除主题

若想完全删除,启动时加载server.properties文件中配置delete.topic.enable=true,默认为false

若是没有设置为true,只是标记kafka-test1为删除状态,当设置为true时删除文件目录及元数据。

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --delete --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test1

 

3.查看主题

3.1

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --list --zookeeper h201:2181,h202:2181,h203:2181

3.2查看详细描述信息

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181

 

3.3 查看处于正在同步主题

能够看到正在处于“under replicated”状态分区,该状态可能为正在同步或同步异常。

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --under-replicated-partitions

 

3.4 查看没有leader的分区

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --unavailable-partitions

 

3.5 查看主题覆盖的配置

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --topics-with-overrides

 

4.修改主题

4.1

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --config segment.bytes=209715200

##段大小 修改成200m##

 

4.2 删除修改

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --delete-config segment.bytes

 

4.3 增长分区

Kafka不支持减小分区操做,只能增长分区操做

主题kafka-test当前分区数为3个,调整为5个

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --partitions 5

 

[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/kafka-test/partitions

结果:

[0, 1, 2, 3, 4]

5.生产者操做

使用kafka-console-producer.sh脚本

参数producer.config:用于加载一个生产者配置文件

参数producer-property:用于设置参数将会覆盖所加载的配置文件中的参数

参数property:用于设置消息消费者相关配置

 

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test

必传参数--broker-list,--topic

测试:(显示主题下各个分区的偏移量)

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test --time -1

##kafka-run-class.sh为kafka自带测试工具###

 

6.消费者操做

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --consumer-property group.id=consumer-g1  --from-beginning

 

消费者建立时会向zookeeper中注册元数据信息 /consumers/${group.id}

---------------------测试-------------------------

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test

添加测试消息内容

hello kafka

 

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --consumer-property group.id=consumer-g1  --from-beginning

可以接收到 “hello kafka”消息内容

1.  主题管理

kafka-topics.sh工具脚本用于对主题操做,如建立、删除、修改、分区数、副本数及主题级别的配置。

1.1  建立名为kafka-test主题,有2个副本,3个分区

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic kafka-test

输出:

Created topic "kafka-test".

####

--zookeeper 参数必传,用于与zookeeper链接

--partitions 参数必传,用于设置分区数

--replication-factor 参数必传,用于设置副本数

####

1.2进入log目录

[hadoop@h201 kafka_2.12-0.10.2.1]$ cd kafkalogs/

[hadoop@h201 kafkalogs]$ ls

kafka-test-1kafka-test-0

##副本分布到不一样的节点上###

h202

[hadoop@h202 kafkalogs]$ ls

kafka-test-2kafka-test-1

 

[hadoop@h201 zookeeper-3.4.5-cdh5.5.2]$ bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/kafka-test/partitions

结果:

[0, 1, 2]

[zk: localhost:2181(CONNECTED) 2] get /brokers/topics/kafka-test

 

1.3 建立主题时,能够经过config参数设置主题级别以覆盖默认配置

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic kafka-test1 --config max.message.bytes=404800

 

[hadoop@h201 zookeeper-3.4.5-cdh5.5.2]$ bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 1] get /config/topics/kafka-test1

结果:

{"version":1,"config":{"max.message.bytes":"404800"}}

 

2.  删除主题

若想完全删除,启动时加载server.properties文件中配置delete.topic.enable=true,默认为false

若是没有设置为true,只是标记kafka-test1为删除状态,当设置为true时删除文件目录及元数据。

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --delete --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test1

 

3.  查看主题

3.1

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --list --zookeeper h201:2181,h202:2181,h203:2181

3.2查看详细描述信息

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181

 

3.3 查看处于正在同步主题

能够看到正在处于under replicated”状态分区,该状态可能为正在同步或同步异常。

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --under-replicated-partitions

 

3.4 查看没有leader的分区

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --unavailable-partitions

 

3.5 查看主题覆盖的配置

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --describe --zookeeper h201:2181,h202:2181,h203:2181 --topics-with-overrides

 

4.  修改主题

4.1

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --config segment.bytes=209715200

##段大小 修改成200m##

 

4.2 删除修改

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --delete-config segment.bytes

 

4.3 增长分区

Kafka不支持减小分区操做,只能增长分区操做

主题kafka-test当前分区数为3个,调整为5

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --alter --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --partitions 5

 

[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/kafka-test/partitions

结果:

[0, 1, 2, 3, 4]

5.  生产者操做

使用kafka-console-producer.sh脚本

参数producer.config:用于加载一个生产者配置文件

参数producer-property:用于设置参数将会覆盖所加载的配置文件中的参数

参数property:用于设置消息消费者相关配置

 

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test

必传参数--broker-list--topic

测试:(显示主题下各个分区的偏移量)

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test --time -1

##kafka-run-class.shkafka自带测试工具###

 

6.  消费者操做

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --consumer-property group.id=consumer-g1  --from-beginning

 

消费者建立时会向zookeeper中注册元数据信息 /consumers/${group.id}

---------------------测试-------------------------

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list h201:9092,h202:9092,h203:9092 --topic kafka-test

添加测试消息内容

hello kafka

 

[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper h201:2181,h202:2181,h203:2181 --topic kafka-test --consumer-property group.id=consumer-g1  --from-beginning

可以接收到hello kafka”消息内容

相关文章
相关标签/搜索