kafka自己就是LinkIn公司开发用于日志系统的,因此其文件叫作log
php
点对对与发布订阅的区别java
1.1 点对点模式,生产者发送一条消息到queue,只有一个消费者能收到。 python
1.2 发布订阅模式 nginx
kafka背景介绍
kafka是最初由Linkedin公司开发,使用Scala语言编写,Kafka是一个分布式、分区的、多副本的、多订阅者的日志系统(分布式MQ系统),能够用于web/nginx日志,搜索日志,监控日志,访问日志等等。
kafka目前支持多种客户端语言:java,python,c++,php等等。
c++
kafka高吞吐量的设计
数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
zero-copy:减小IO操做步骤。
支持数据批量发送和拉取。
支持数据压缩。
Topic划分为多个partition,提升并行处理能力。
web
kafka信息存储
Kafka中的Message是以topic为基本单位的,不一样的topic之间是相互独立的。每一个topic又能够分红几个不一样的partition(在建立topic时指定的),每一个partition存储一部分Message。关系以下图
apache
partition是以文件的形式存储在文件系统中,好比,建立了一个名为t101的topic,其有5个partition,那么在Kafka的数据目录中(log.dirs指定的)中就有这样4个目录: t101-0,t101-1,t101-2,t101-3 ,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这4个partition的数据。 建立命令以下:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic t101
bootstrap
4.1 数据文件分段
好比有100条Message,它们的offset是从0到99。假设将数据文件分红5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就能够定位到该Message在哪一个段中。bash
4.2 为数据文件建索引
数据文件分段使得能够在一个较小的数据文件中查找对应offset的Message了,可是这依然须要顺序扫描才能找到对应offset的Message。为了进一步提升查找的效率,Kafka为每一个分段后的数据文件创建了索引文件,文件名与数据文件的名字是同样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每一个条目表示数据文件中一条Message的索引。索引包含两个部分,分别为相对offset和position。
服务器
#相对offset:由于数据文件分段之后,每一个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20=5。存储相对offset可减小索引文件占用的空间 。
#position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就能够读取对应的Message了。
index文件中并无为数据文件中的每条Message创建索引,而是采用了稀疏存储的方式,每隔必定字节的数据创建一条索引。这样避免了索引文件占用过多的空间,从而能够将索引文件保留在内存中。缺点是没有创建索引的Message也不能一次定位到其在数据文件的位置,从而须要作一次顺序扫描,可是此次顺序扫描的范围就很小了。
4.3 查找message原理图
1)首先是用二分查找肯定它是在哪一个LogSegment中,天然是在第一个Segment中。 2)打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。天然offset为6的那个索引是咱们要找的,经过索引文件咱们知道offset为6的Message在数据文件中的位置为9807。
3)打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。
Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性
5.5 zookeeper记录分区与消费者的关系
对于每一个消费者组 (Consumer Group),Kafka都会为其分配一个全局惟一的Group ID,Group内部的全部消费者共享该ID。
同时,Kafka为每一个消费者分配一个Consumer ID,一般采用"Hostname:UUID"形式表示。
在Kafka中,规定了每一个消息分区 只能被同组的一个消费者进行消费,所以,须要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每一个消费者一旦肯定了对一个消息分区的消费权力,须要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
5.6 消息消费进度Offset记录
在消费者对指定消息分区进行消息消费的过程当中,须要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其余消费者从新接管该消息分区的消息消费后,可以从以前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值
新版本消费位移存储
老版本的消费位移信息是存储的zookeeper 中的, 可是zookeeper 并不适合频繁的写入查询操做, 因此在新版本的中消费位移信息存放在了__consumer_offsets内置topic中。
能够利用以下命令建立consumers group信息,建立group consumer_offsets_t105
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105
查询consumer_offsets_t105 在 __consumer_offsets topic 中存放的位移信息__consumer_offsets 默认分区50。经过以下公式便可获取:Math.abs("consumer_offsets_t105".hashCode()) % 50。
kafka java调用
7.1 生产者
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("t105", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
复制代码
7.2 消费者
复制代码
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "group4");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "none");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("t105"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll( Duration.ofMillis(100)
);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = "+ record.partition() +" offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
参数auto.offset.reset有三个值latest, earliest, none。<br>
earliest: automatically reset the offset to the earliest offset<br>
latest: automatically reset the offset to the latest offset<br>
none: hrow exception to the consumer if no previous offset is found for the consumer's group
复制代码
kafka的流实例参考
juejin.im/post/5cd50a…
建立一个topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic t106
describe topics bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic t106
往集群中发消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t106
集群消费消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic t106
验证消息是否生产成功 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1
—————————
建立消费组消费消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105
查询偏移量消息 bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning
//经过config文件访问客户端 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --group consumer_offsets_t105 --consumer.config config/consumer.properties