每个分区都是一个顺序的、不可变的消息队列, 而且能够持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每一个分区中此偏移量都是惟一的。html
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1 - 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 1.建立了3个p的topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3 2.Kafka中能够将Topic从物理上划分红一个或多个分区(Partition),每一个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的全部消息(.log)和索引文件(.index),这使得Kafka的吞吐率能够水平扩展。 n1: p0 test-topic3-0 n2: p1 test-topic3-1 n3: p2 test-topic3-2
[root@n1 logstash]# ls -ld /data/kafka-logs/test-topic3-0 [root@n2 logstash]# ls -ld /data/kafka-logs/test-topic3-1 [root@n3 logstash]# ls -ld /data/kafka-logs/test-topic3-2
消费者能够从
1.zk拉数据json
## src_base_zk - 查看基于zk的消费组 bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 查看group详情(判断cusumer是否正常) bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
2,kafka拉数据bootstrap
## src_base_kafka - 查看zk bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list - 查看group详情(判断cusumer是否正常) bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe - 查看实时消费日志 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1 - 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 监控消费日志 [root@n1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic hi mao
logstash消费ruby
input { kafka { bootstrap_servers => "localhost:9092" topics => "test-topic" group_id => "logstash-group" codec => "json" consumer_threads => 1 decorate_events => true } } output { stdout { codec => rubydebug } }
/usr/local/logstash/bin/logstash -f logstash.yaml --config.reload.automatic并发
先建立3个p的test-topic3
1.当有3个p, 1个消费者时
app
2.当有3个p,2个消费者时debug
3.当有3个p,3个消费者时
都是动态调配的(新增一个消费者, p的分配会自动变)3d
4.当有1个p,2个消费者
日志
小结: 同一个消费组, 消费者个数<=p个数code
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3 - 查看topic list bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list - 查看topic 详细 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 --describe - 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic3 基于zk的消费者 - 查看基于zk的消费组 bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 开启一个消费者(随机生成group) bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 查看group详情(判断cusumer是否正常) bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe 基于kafka的消费者 - 查看基于kafka的消费组 bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list - 查看group详情(判断cusumer是否正常) bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe - 开启一个消费者(随机生成group) bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group - 开启一个消费者(指定group,可能偷走已有的消费者的数据) bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group
参考: http://lxw1234.com/archives/2015/10/538.htm https://www.cnblogs.com/AcAc-t/p/kafka_topic_consumer_group_command.html https://www.cnblogs.com/happyday56/p/4208663.html