1,kafka依赖于zookeeper,下载:apache
kafka2.10-0.10.00包下载,zookeeper3.4.10下载;ubuntu
2,配置启动ZOOKEEPERoop
配置项:ZOOKEEPER_HOME,和PATH;参考:spa
export ZOOKEEPER_HOME=/home/t/source/zookeeper-3.4.10 export JAVA_HOME=/home/t/source/jdk1.8.0_121 export PATH=/home/t/source/jdk1.8.0_121/bin:/home/t/source/scala/scala-2.10.6/bin:/home/t/source/spark/spark-1.6.2-bin-hadoop2.6/bin:$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf:/home/t/source/sbt/sbt/bin:/home/t/source/hadoop-2.6.4/bin export HADOOP_HOME=/home/t/source/hadoop-2.6.4 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export YARN_HOME=/home/t/source/hadoop-2.6.4 export YARN_CONF_DIR=${YARN_HOME}/etc/hadoop
修改zookeeper-3.4.10/conf下,zoo.conf文件:scala
设置项:code
dataDir=/home/t/source/zookeeper-3.4.10/dataDir dataLogDir=/home/t/source/zookeeper-3.4.10/dataLogDir
zookeeper启动:server
./zkServer.sh start
3,配置启动kafkahadoop
修改kafka配置项:部署
kafka外网访问 advertised.listeners=PLAINTEXT://x.x.x.x:9092get
启动kafka
./kafka-server-start.sh ../config/server.properties
建立topic(消息类型)
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生产消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息:topic数据总量
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
描述主题:
bin
/kafka-topics
.sh --describe --zookeeper localhost:2181 --topic myTest4
最终效果:
生产端输入什么,消费端输出什么。
4,建立多broker集群
复制多个server.properties,修改broker.id,由于笔者在单机部署,须要再修改listerners.port,log.dir
复制server.properties,启动多个broker
bin/kafka-server-start.sh config/server-1.properties bin/kafka-server-start.sh config/server-2.properties
建立一个bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTest2
不写入数据,发现仅在broker=0的log。dir下生成目录
随机写入一些数据,
由于已经有三个broker了, 能够建立replication-factor <=3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTest3
若是超过3,会抛以下错误
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 5 --partitions 1 --topic myTest2 Error while executing topic command : replication factor: 5 larger than available brokers: 3 [2017-09-05 17:24:55,153] ERROR kafka.admin.AdminOperationException: replication factor: 5 larger than available brokers: 3 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:110) at kafka.admin.TopicCommand$.main(TopicCommand.scala:61) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) t@ubuntu:~/source/kafka_2.10-0.10.0.0$
查看三个broker的log.dir目录,均多出myTest3-0/目录
使用describe topics命令
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTest3 Topic:myTest3 PartitionCount:1 ReplicationFactor:3 Configs: Topic: myTest3 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 t@ubuntu:~/source/kafka_2.10-0.10.0.0$
由于 replication-factor和partion关系有点乱,因此再试试这个
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic myTest4
结果是三个broker的log.dir目录都出现了
5,kafka 在bin目录下提供了connect-standalone.sh来自动导入导出数据
bin
/connect-standalone
.sh config
/connect-standalone
.properties config
/connect-file-source
.properties config
/connect-file-sink
.properties
connect-file-source.properties配置导入数据链接类以及对应的topic
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
connect-file-sink.properties配置对应的导出数据链接类以及对应的topic
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test