简介:html
Apache Kafka 是一个 Scala 语言编写的可扩展、分布式、高性能的容错消息发布、订阅系统。
官网地址:http://kafka.apache.org
中文教程:http://www.orchome.com/kafka/index
下载地址:http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz ( Scala 2.11 is recommended )java
Java版本:jdk-8u111-linux-x64.rpmnode
1、单机部署linux
shell > vim /etc/hosts 192.168.10.23 zk-node01 kb-node01 # 必须配置,不然没法启动 shell > rpm -ivh jdk-8u111-linux-x64.rpm shell > java -version java version "1.8.0_111" Java(TM) SE Runtime Environment (build 1.8.0_111-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode) shell > cd /usr/local/src; tar zxf kafka_2.11-1.0.0.tgz -C /usr/local shell > cd /usr/local/kafka_2.11-1.0.0
一、zookeepershell
shell > vim config/zookeeper.properties # 数据目录 dataDir=/data/zookeeper_data # 监听端口 clientPort=2181 # 最大链接数 不限制 maxClientCnxns=0 shell > sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 启动 zookeeper
二、kafkaapache
shell > vim config/server.properties # 惟一ID broker.id=1 # 监听地址 listeners=PLAINTEXT://0.0.0.0:9092 # 向 Zookeeper 注册的地址,这里若是须要同时内外网访问须要注册 hostname,不然只能注册外网IP地址,会致使全部流量都走外网 advertised.listeners=PLAINTEXT://kb-node01:9092 # 数据目录 log.dirs=/data/kafka_data # 容许删除topic delete.topic.enable=true # 不容许自动建立topic auto.create.topics.enable=false # 磁盘IO不足的时候,能够适当调大该值 ( 当内存足够时 ) #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 # kafka 数据保留时间 默认 168 -> 7 天 log.retention.hours=24 # zookeeper zookeeper.connect=zk-node01:2181 # 其他都使用默认配置 # 这样配置,外网访问 kafka 时,须要配置 hosts ( IP kb-node01 ) 不然提示主机名未知! shell > sh bin/kafka-server-start.sh -daemon config/server.properties # 启动 kafka
2、kafka 指令bootstrap
一、topicvim
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --create --topic kafka-test --partitions 1 --replication-factor 1 Created topic "kafka-test". shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list kafka-test
# 建立一个 topic kafka-test,它有一个分区、一个副本分布式
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-test Topic:kafka-test PartitionCount:1 ReplicationFactor:1 Configs: Topic: kafka-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
# 查看这个 topic 的属性,总共一个分区,一个副本;当前分区为 0,leader 为 broker.id=1 的 broker,
# 副本所在 broker,活跃的 broker ( 须要同步副本的broker )性能
二、consumer
shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01:9092 --topic kafka-test --from-beginning
# 启动一个消费者,消费 kafka-test 这个 topic,从头读取
三、producer
shell > sh bin/kafka-console-producer.sh --broker-list kb-node01:9092 --topic kafka-test > hello world
# 重启一个终端,启动一个生产者,输入 hello world,这是消费者终端会看到消息 hello world
四、alter topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --alter --topic kafka-test --partitions 2 shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-test Topic:kafka-test PartitionCount:2 ReplicationFactor:1 Configs: Topic: kafka-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: kafka-test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
# 这样就将 topic kafka-test 的分区修改为了 2 个
五、delete topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --delete --topic kafka-test shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list __consumer_offsets
# 这样就删除了 topic kafka-test了,__consumer_offsets 这个 topic 为系统自动生成,用来存放消费者 offset 信息的。
3、伪分布式部署
shell > sh bin/kafka-server-stop.sh
# 中止原来的 kafka 不停也行...
shell > cp config/server.properties config/server-1.properties shell > cp config/server.properties config/server-2.properties shell > cp config/server.properties config/server-3.properties
# 没办法,强迫症
shell > vim config/server-1.properties broker.id=1 listeners=PLAINTEXT://0.0.0.0:9091 advertised.listeners=PLAINTEXT://kb-node01:9091 log.dirs=/data/kafka_data-1 shell > vim config/server-2.properties broker.id=2 listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://kb-node01:9092 log.dirs=/data/kafka_data-2 shell > vim config/server-3.properties broker.id=3 listeners=PLAINTEXT://0.0.0.0:9093 advertised.listeners=PLAINTEXT://kb-node01:9093 log.dirs=/data/kafka_data-3
# 这几项不能重复 注意:防火墙要开启相应的端口
shell > sh bin/kafka-server-start.sh -daemon config/server-1.properties shell > sh bin/kafka-server-start.sh -daemon config/server-2.properties shell > sh bin/kafka-server-start.sh -daemon config/server-3.properties
# 启动这些 broker
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --create --topic kafka-all --partitions 6 --replication-factor 3 Created topic "kafka-all". shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list __consumer_offsets kafka-all
# 新建立的 topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-all Topic:kafka-all PartitionCount:6 ReplicationFactor:3 Configs: Topic: kafka-all Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: kafka-all Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: kafka-all Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: kafka-all Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: kafka-all Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: kafka-all Partition: 5 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
# 一个 6 分区、3 副本的 topic 诞生了,0 分区的 leader 是 3,
# 副本分布在 三、一、2 上,活跃的 broker 为 三、一、2 ( 须要同步副本的broker )
shell > sh bin/kafka-console-producer.sh --broker-list kb-node01:9091,kb-node01:9092 --topic kafka-all > hello kafka-all
# 启动一个生产者,注意:只写了两个 broker 向 kafka-all topic 中发送了一条消息
shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01:9093 --topic kafka-all --from-beginning hello kafka-all
# 启动一个消费者,注意:只写了生产者没填写的 broker,仍是消费到了消息
# 说明:broker 不须要所有填写,会自动发现,即便有机器宕机数据也不会丢失!
# 注意:集群部署时,zookeeper 要么为 1,要么为 3,不要是两台,不然其中一台宕机,集群则没法提供服务!
# 部署 zookeeper 集群请参照其他博文。