启动server。node
./kafka_2.11-2.0.0/bin/kafka-server-start.sh ./kafka_2.11-2.0.0/config/server.properties # properties中默认的端口是9092,若是须要修改的话,修改位置以下: listeners=PLAINTEXT://:9090
建立topicapache
kafka-topics.sh --create --zookeeper yun1:2181 --replication-factor 1 --partitions 1 --topic test # partitions : 分区数量 replication-factor:须要同步信息的节点数量(备份数量)
启动producerbootstrap
./kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9090 --topic test-create # broker-list:节点信息,若是有多个的话,ip1:port1,ip2:port2,ip3:port3
启动consumercode
./kafka-console-consumer.sh --bootstrap-server localhost:9090 --topic test-create --from-beginning # bootstrap-server:须要链接的kafka server(旧版本的使用zookeeper:port来链接,后续版本这个参数已经不推荐) from-beginning:获取该topic上面的全部已经发不过的消息。
异常处理server
当producer或则consumer链接的server端口与不对的话。producer、consumer会报出以下错误 [2018-08-02 14:40:32,612] WARN [Consumer clientId=consumer-1, groupId=console-consumer-70274] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
排查错误ip
查看zookeeper的输出信息。默认使用的broker id 是0 ./zkCli.sh -server yun1:2181 <<< "get /brokers/ids/0” 输出信息,broker=0的port启动的端口是9092 WatchedEvent state:SyncConnected type:None path:null {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.162.15:9092"],"jmx_port":-1,"host":"10.1.162.15","timestamp":"1533190983702","port":9092,"version":4} cZxid = 0x78c ctime = Thu Aug 02 14:23:03 CST 2018 mZxid = 0x78c mtime = Thu Aug 02 14:23:03 CST 2018 pZxid = 0x78c cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x1644a37232e01d6 dataLength = 192 numChildren = 0 [zk: yun1:2181(CONNECTED) 1] %
查看topic信息get
./kafka-topics.sh --describe --zookeeper yun1:2181 --topic topic-repication # 有三个节点相互备份。三个分区 ############################################################ Topic:topic-repication PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-repication Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: topic-repication Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: topic-repication Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 #leader:该节点负责该分区的全部读写。每一个节点都是随机选择的 #replicas:备份的节点列表 #Isr:同步备份的节点列表,活着的节点而且正在同步leader