下载kafka,自带 zookeeper。apache
zookeeper 集群使用 Raft 选举模式,故至少要三个节点(生产中应部署在三个不一样的服务器实例上,这里用于演示就不那么作了)。bootstrap
# 复制三分节点配置 cp config/zookeeper.properties config/zookeeper.2181.properties cp config/zookeeper.properties config/zookeeper.2182.properties cp config/zookeeper.properties config/zookeeper.2183.properties
修改配置config/zookeeper.2181.properties
服务器
# the directory where the snapshot is stored. dataDir=/tmp/zookeeper/2181 # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 tickTime=2000 initLimit=10 syncLimit=5 server.1=localhost:12888:13888 server.2=localhost:22888:23888 server.3=localhost:32888:33888
config/zookeeper.2182.properties
修改clientPort=2182
dataDir=/tmp/zookeeper/2182
其余一致config/zookeeper.2183.properties
修改clientPort=2183
dataDir=/tmp/zookeeper/2183
其余一致less
主要是修改服务端口clientPort
和数据目录dataDir
,其余参数表征以下:tickTime=2000
为zk的基本时间单元,毫秒initLimit=10
Leader-Follower初始通讯时限(tickTime*10
)syncLimit=5
Leader-Follower同步通讯时限(tickTime*5
)server.实例集群标识=实例地址:数据通讯端口:选举通讯端口
this
为实例添加集群标识url
echo 1 >> /tmp/zookeeper/2181/myid echo 2 >> /tmp/zookeeper/2182/myid echo 3 >> /tmp/zookeeper/2183/myid
启动集群服务日志
bin/zookeeper-server-start.sh config/zookeeper.2181.properties bin/zookeeper-server-start.sh config/zookeeper.2182.properties bin/zookeeper-server-start.sh config/zookeeper.2183.properties
Kafka
集群节点>=2
时即可对外提供高可用服务code
cp config/server.properties config/server.9092.properties cp config/server.properties config/server.9093.properties
修改节点标识、服务端口、数据目录和zk集群节点列表vi config/server.9092.properties
orm
broker.id=1 ... listeners=PLAINTEXT://:9092 ... log.dirs=/tmp/kafka-logs/1 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
vi config/server.9093.properties
server
broker.id=2 ... listeners=PLAINTEXT://:9093 ... log.dirs=/tmp/kafka-logs/2 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
启动集群
bin/kafka-server-start.sh config/server.9092.properties bin/kafka-server-start.sh config/server.9093.properties
topic
bin/kafka-topics.sh --create \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --replication-factor 2 \ --partition 4 \ --topic topic_1
--replication-factor 2
:副本集数量,不能大于 broker 节点数量,多了也没用,1个节点放>=2个副本挂了都完蛋。--partition 4
:分区数
topic
列表bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 --list topic_1 topic_2
能够描述Topic分区数/副本数/副本Leader/副本ISR
等信息:
bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --describe --topic topic_1 Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
注意,只是删除Topic
在zk
的元数据,日志数据仍需手动删除。
bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --delete --topic topic_2 #Topic topic_2 is marked for deletion. #Note: This will have no impact if delete.topic.enable is not set to true. #再查看topic列表 bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 --list #topic_1 #topic_2 - marked for deletion
bin/kafka-console-producer.sh \ --broker-list localhost:9092,localhost:9093 \ --topic topic_1 # 进入 cli 输入消息回车发送 # hello kafka [enter] # send message [enter]
新模式,offset
存储在borker
--new-consumer Use new consumer. This is the default.
--bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.
老消费模式,offset
存储在zk
--zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
bin/kafka-console-consumer.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --from-beginning \ --topic topic_1
能够尝试建立多个不一样消费组的消费者
(这里的sh
脚本建立的都是不一样消费组的),订阅同一个topic
来实现发布订阅模式。
bin/kafka-consumer-groups.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --list #这里有两个消费组的消费者 console-consumer-47566 console-consumer-50875
能够查看到消费的订阅的 topic,负责的 partition,消费进度 offset, 积压的消息LAG
。
bin/kafka-consumer-groups.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --group console-consumer-47566 \ --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER console-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1