kafka消息队列

kafka自己就是LinkIn公司开发用于日志系统的,因此其文件叫作log
php

  1. 点对对与发布订阅的区别java

    1.1 点对点模式,生产者发送一条消息到queue,只有一个消费者能收到。 python

    1.2 发布订阅模式 nginx

    发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息
    rabbitMQ中实现发布订阅模式
    当RabbitMQ须要支持多订阅时,发布者发送的消息经过路由同时写到多个Queue,不一样订阅组消费此消息。
    kafka发布订阅模式
    Kafka只支持r息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会当即删除,会保留历史消息。所以支持多订阅时,消息只会存储一份就能够了。

  2. kafka背景介绍
    kafka是最初由Linkedin公司开发,使用Scala语言编写,Kafka是一个分布式、分区的、多副本的、多订阅者的日志系统(分布式MQ系统),能够用于web/nginx日志,搜索日志,监控日志,访问日志等等。
    kafka目前支持多种客户端语言:java,python,c++,php等等。
    c++

  3. kafka高吞吐量的设计
    数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
    zero-copy:减小IO操做步骤。
    支持数据批量发送和拉取。
    支持数据压缩。
    Topic划分为多个partition,提升并行处理能力。
    web

  4. kafka信息存储
    Kafka中的Message是以topic为基本单位的,不一样的topic之间是相互独立的。每一个topic又能够分红几个不一样的partition(在建立topic时指定的),每一个partition存储一部分Message。关系以下图
    apache

    partition是以文件的形式存储在文件系统中,好比,建立了一个名为t101的topic,其有5个partition,那么在Kafka的数据目录中(log.dirs指定的)中就有这样4个目录: t101-0,t101-1,t101-2,t101-3 ,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这4个partition的数据。 建立命令以下:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic t101
    bootstrap

    新数据是添加在文件末尾,不论文件数据文件有多大,这个操做永远都是O(1)的。
    查找某个offset的Message是顺序查找的。所以,若是数据文件很大的话,查找的效率就低。
    为解决查找效率低的问题,kafka采用分段和索引

4.1 数据文件分段
好比有100条Message,它们的offset是从0到99。假设将数据文件分红5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就能够定位到该Message在哪一个段中。bash

4.2 为数据文件建索引
数据文件分段使得能够在一个较小的数据文件中查找对应offset的Message了,可是这依然须要顺序扫描才能找到对应offset的Message。为了进一步提升查找的效率,Kafka为每一个分段后的数据文件创建了索引文件,文件名与数据文件的名字是同样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每一个条目表示数据文件中一条Message的索引。索引包含两个部分,分别为相对offset和position。
服务器

#相对offset:由于数据文件分段之后,每一个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20=5。存储相对offset可减小索引文件占用的空间 。
#position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就能够读取对应的Message了。
index文件中并无为数据文件中的每条Message创建索引,而是采用了稀疏存储的方式,每隔必定字节的数据创建一条索引。这样避免了索引文件占用过多的空间,从而能够将索引文件保留在内存中。缺点是没有创建索引的Message也不能一次定位到其在数据文件的位置,从而须要作一次顺序扫描,可是此次顺序扫描的范围就很小了。
4.3 查找message原理图

1)首先是用二分查找肯定它是在哪一个LogSegment中,天然是在第一个Segment中。 2)打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。天然offset为6的那个索引是咱们要找的,经过索引文件咱们知道offset为6的Message在数据文件中的位置为9807。
3)打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性

  1. zookeeper在kafka中的做用
    其中5.五、5.六、5.7的是老版本的设计方式,新的版本偏移量已经不在存在zookeeper中。
    5.1 管理broker集群
    Broker是分布式部署而且相互之间相互独立,可是须要有一个注册系统可以将整个集群中的Broker管理起来。
    在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
    /brokers/ids
    每一个broker启动的时候都会在zookeeper上进行注册。
    Kafka使用了全局惟一的数字来指代每一个Broker服务器,不一样的Broker必须使用不一样的Broker ID进行注册,建立完节点后,每一个Broker就会将本身的IP地址和端口信息记录到该节点中去。其中,Broker建立的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
    5.2 管理topic信息
    在Kafka中,同一个Topic的消息会被分红多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:
    /borkers/topics
    Kafka中每一个Topic都会以/brokers/topics/[topic]的形式被记录,Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册本身的Brokerid并写入针对该Topic的分区总数,一样,这个分区节点也是临时节点。
    5.3 生产者负载均衡

    因为同一个Topic消息会被分区并将其分布在多个Broker上,所以,生产者须要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
    (1) 四层负载均衡,根据生产者的IP地址和端口来为其肯定一个相关联的Broker。一般,一个生产者只会对应单个Broker,而后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每一个生产者不须要同其余系统创建额外的TCP链接,只须要和Broker维护单个TCP链接便可。可是,其没法作到真正的负载均衡,由于实际系统中的每一个生产者产生的消息量及每一个Broker的消息存储量都是不同的,若是有些生产者产生的消息远多于其余生产者的话,那么会致使不一样的Broker接收到的消息总数差别巨大,同时,生产者也没法实时感知到Broker的新增和删除。
    (2) 使用Zookeeper进行负载均衡,因为每一个Broker启动时,都会完成Broker注册过程,生产者会经过该节点的变化来动态地感知到Broker服务器列表的变动,实现动态的负载均衡机制。
    Kafka的生产者会对ZooKeeper上的“Broker的新增与减小”、“Topic的新增和减小”和“Broker和Topic关联关系的变化”等事件注册Watcher监听
    经过ZooKeeper的Watcher通知可以让生产者动态的获取Broker和Topic的变化状况。
    5.4 消费者负载均衡
    与生产者相似,Kafka中的消费者一样须要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每一个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不一样的消费者分组消费本身特定的Topic下面的消息。

5.5 zookeeper记录分区与消费者的关系
对于每一个消费者组 (Consumer Group),Kafka都会为其分配一个全局惟一的Group ID,Group内部的全部消费者共享该ID。
同时,Kafka为每一个消费者分配一个Consumer ID,一般采用"Hostname:UUID"形式表示。
在Kafka中,规定了每一个消息分区 只能被同组的一个消费者进行消费,所以,须要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每一个消费者一旦肯定了对一个消息分区的消费权力,须要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

节点内容就是该消息分区上消费者的Consumer ID

5.6 消息消费进度Offset记录
在消费者对指定消息分区进行消息消费的过程当中,须要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其余消费者从新接管该消息分区的消息消费后,可以从以前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值

5.7 消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤以下
注册到消费者分组。每一个消费者服务器启动时,都会到Zookeeper的指定节点下建立一个属于本身的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点建立后,消费者就会将本身订阅的Topic信息写入该临时节点。
对消费者分组中的消费者的变化注册监听。每一个消费者都须要关注所属消费者分组中其余消费者服务器的 变化状况, 即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减小,就触发消费者的负载均衡。
对Broker服务器变化注册监听。消费者须要对/broker/ids/[0-N]中的节点进行监听,若是发现Broker服务器列表发生变化,那么就根据具体状况来决定是否须要进行消费者负载均衡。
进行消费者负载均衡。为了让同一个Topic下不一样分区的消息尽可能均衡地被多个消费者消费而进行消费者与消息 分区分配的过程,一般,对于一个消费者分组,若是组内的消费者服务器发生变动或Broker服务器发生变动,会发出消费者负载均衡。

  1. 新版本消费位移存储
    老版本的消费位移信息是存储的zookeeper 中的, 可是zookeeper 并不适合频繁的写入查询操做, 因此在新版本的中消费位移信息存放在了__consumer_offsets内置topic中。
    能够利用以下命令建立consumers group信息,建立group consumer_offsets_t105
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105

    查询consumer_offsets_t105 在 __consumer_offsets topic 中存放的位移信息__consumer_offsets 默认分区50。经过以下公式便可获取:Math.abs("consumer_offsets_t105".hashCode()) % 50。

    能够计算得位移偏移量是存在partitionId等于44分区上。
    使用命令能够查询出消息的偏移信息。
    bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning
    能够根据命令查询消息的发布状况
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1
    能够看出消息偏移与消息的发布的数据基本一致。

  2. kafka java调用
    7.1 生产者

    import java.util.Properties;
         import org.apache.kafka.clients.producer.KafkaProducer;
         import org.apache.kafka.clients.producer.Producer;
         import org.apache.kafka.clients.producer.ProducerRecord;
         public class ProducerDemo {
         
             public static void main(String[] args){
                 Properties properties = new Properties();
                 properties.put("bootstrap.servers", "localhost:9092");
                 properties.put("acks", "all");
                 properties.put("retries", 0);
                 properties.put("batch.size", 16384);
                 properties.put("linger.ms", 1);
                 properties.put("buffer.memory", 33554432);
                 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                 Producer<String, String> producer = null;
                 try {
                     producer = new KafkaProducer<String, String>(properties);
                     for (int i = 0; i < 100; i++) {
                         String msg = "Message " + i;
                         producer.send(new ProducerRecord<String, String>("t105", msg));
                         System.out.println("Sent:" + msg);
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
         
                 } finally {
                     producer.close();
                 }
         
             }
         }
        
        
    
    复制代码
7.2 消费者

复制代码
import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    
    public class ConsumerDemo {
    
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", "group4");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "none");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("t105"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(            Duration.ofMillis(100)
                );
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("partition = "+ record.partition() +"  offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }
    
        }
    }

参数auto.offset.reset有三个值latest, earliest, none。<br>
earliest: automatically reset the offset to the earliest offset<br>
latest:  automatically reset the offset to the latest offset<br>
none: hrow exception to the consumer if no previous offset is found for the consumer's group
复制代码
  1. kafka零拷贝
    传统的IO机制
    这一过程实际上发生了四次数据拷贝。首先经过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝),而后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝),接着用户程序经过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝),最后经过 DMA 拷贝将数据拷贝到 NIC Buffer。同时,还伴随着四次上下文切换。

数据经过 DMA 拷贝到内核态 Buffer 后,直接经过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝。除了减小数据拷贝外,由于整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,所以大大提升了性能。

  1. Kafka的leader选举机制
    只有leader 负责读写,follower只负责备份,若是leader宕机的话,Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,ISR中有f+1个节点,就能够容许在f个节 点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR。所以若是leader宕了,直接从ISR中选择一个follower就行。

若是全部的ISR副本都失败了
此时有两种方法可选,一种是等待ISR集合中的副本复活,一种是选择任何一个当即可用的副本,而这个副本不必定是在ISR集合中。这两种方法各有利弊,实际生产中按需选择。
若是要等待ISR副本复活,虽然能够保证一致性,但可能须要很长时间。而若是选择当即可用的副本,则极可能该副本并不一致。

  1. kafka Stream
    一个流处理器从它所在的拓扑上游接收数据,经过Kafka Streams提供的流处理的基本方法, 如map()、filter()、join()以及聚合等方法,对数据进行处理,而后将处理以后的一个或者多个输出结果发送给下游流处理器。

kafka的流实例参考
juejin.im/post/5cd50a…

  1. kafka压缩

12. 经常使用命令 启动 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &

建立一个topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic t106

describe topics bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic t106

往集群中发消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t106

集群消费消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic t106

验证消息是否生产成功 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1

—————————

建立消费组消费消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105

查询偏移量消息 bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning

//经过config文件访问客户端 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --group consumer_offsets_t105 --consumer.config config/consumer.properties

相关文章
相关标签/搜索