目录html
kafka下载地址:https://kafka.apache.org/downloads
须要说明的是,kafka的安装依赖于zk,zk的部署可直接参考《Zookeeper介绍与基本部署》。固然,kafka默认也内置了zk的启动脚本,在kafka安装路径的bin目录下,名称为zookeeper-server-start.sh
,若是不想独立安装zk,可直接使用该脚本。java
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar xf kafka_2.12-2.2.0.tgz -C /usr/local/ cd /usr/local ln -s kafka_2.12-2.2.0 kafka
kafka主配置文件为/usr/local/kafka/config/server.properties
,配置示例以下:apache
broker.id=0 listeners=PLAINTEXT://10.1.60.29:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.1.60.29:2181,10.1.61.195:2181,10.1.61.27:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topics.enable=true
配置说明:json
hostname:port/path
。hostname为zk的主机名或ip,port为zk监听的端口。/path
表示kafka的元数据存储到zk上的目录,若是不设置,默认为根目录须要说明的是,多个kafka节点依赖zk实现集群,因此各节点并不须要做特殊配置,只须要broker.id不一样,并接入到同一个zk集群便可。bootstrap
#启动 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #检查java进程 # jps 1394 QuorumPeerMain 13586 Logstash 27591 Kafka 27693 Jps #中止 /usr/local/kafka/bin/kafka-server-start.sh
能够经过zookeeper查看kafka的元数据信息:服务器
#经过zk客户端链接zookeeper ../zookeeper/bin/zkCli.sh #查看根下多了不少目录 [zk: localhost:2181(CONNECTED) 1] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config] #查看/brokers/ids,能够看到有三个broker已经加入 [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids [0, 1, 2] #查看/brokers/topics,目前为空,说明尚未建立任何的topic [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics []
上面完成了kafka的部署,经过验证部署咱们发现当前没有topic,因此建立一个topic以下:网络
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic myfirsttopic Created topic myfirsttopic.
参数说明:socket
上面经过操做zk就能够看到topic相关信息,接下来咱们直接经过kafka命令行来进行相关操做:测试
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list myfirsttopic
#查看myfirsttopic的详细信息 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic Topic:myfirsttopic PartitionCount:3 ReplicationFactor:2 Configs: Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
参数说明:大数据
输出说明:
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic Topic:myfirsttopic PartitionCount:6 ReplicationFactor:2 Configs: Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: myfirsttopic Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: myfirsttopic Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
#建立一个topic名为mysecondtopic,指定分区为2,副本为1 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic Created topic mysecondtopic. #查看新建立的topic详细信息 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic Topic:mysecondtopic PartitionCount:2 ReplicationFactor:1 Configs: Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 #将broker.id为0上的partition的副本由原来的[0]扩充为[0,2],将broker.id为1上的partition的副本由原来的[1]扩充为[1,2]。 #须要先建立一个json文件以下: # cat partitions-to-move.json { "partitions": [ { "topic":"mysecondtopic", "partition": 0, "replicas": [0,2] }, { "topic": "mysecondtopic", "partition": 1, "replicas": [1,2] } ], "version": 1 } #执行副本修改 # ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. #再次查看topic状态,发现副本数由按照预期发生变动 # ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic Topic:mysecondtopic PartitionCount:2 ReplicationFactor:2 Configs: Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1
#执行删除操做 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic Topic myfirsttopic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. #查看topic,能够看到myfirsttopic已被删除 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets mysecondtopic
# ./bin/kafka-console-producer.sh --broker-list 10.1.60.29:9092 --topic mysecondtopic >hello kafka! >hello world! >just a test! > >hi world! >hahahaha! >
# ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning hello kafka! just a test! hi world! hello world! hahahaha!