基本概念
整个Kafka体系结构中引入了如下3个术语。apache
(1)Producer:生产者,也就是发送消息的一方。生产者负责建立消息,而后将其投递到Kafka中。数组
(2)Consumer:消费者,也就是接收消息的一方。消费者链接到Kafka上并接收消息,进而进行相应的业务逻辑处理。缓存
<!--more-->服务器
(3)Broker:服务代理节点。对于Kafka而言,Broker能够简单地看做一个独立的Kafka服务节点或Kafka服务实例。大多数状况下也能够将Broker看做一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。通常而言,咱们更习惯使用首字母小写的broker来表示服务代理节点。网络
在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。架构
主题是一个逻辑上的概念,它还能够细分为多个分区,一个分区只属于单个主题,不少时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不一样分区包含的消息是不一样的,分区在存储层面能够看做一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的惟一标识,Kafka经过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。如图所示,主题中有 4 个分区,消息被顺序追加到每一个分区日志文件的尾部。Kafka中的分区能够分布在不一样的服务器(broker)上,也就是说,一个主题能够横跨多个broker,以此来提供比单个broker更强大的性能。性能
生产者
消息在真正发往Kafka以前,有可能须要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的做用,那么在此以后又会发生什么呢?下面咱们来看一下生产者客户端的总体架构.线程
producer 流程:
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。代理
在主线程中由KafkaProducer建立消息,而后经过可能的拦截器、序列化器和分区器的做用以后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。日志
Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
主线程
<font color=#C7063 size=3>生产者拦截器</font> 能够用来在消息发送前作一些准备工做,好比按照某个规则过滤不符合要求的消息、修改消息的内容等,也能够用来在发送回调逻辑前作一些定制化的需求,好比统计类工做。生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法
KafkaProducer() 在将消息序列化和计算分区以前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操做。
KafkaProducer() 在消息被应答(Acknowledgement)以前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的 Callback 以前执行。
close() 主要用于在关闭拦截器时执行一些资源的清理工做。在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
<font color=#C7063 size=3>序列化器</font> 生产者须要用序列化器(Serializer) 把对象转换成字节数组才能经过网络发送给Kafka,消费者须要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象,Serializer接口中包含3个方法
configure() 方法用来配置当前类
serialize() 方法用来执行序列化操做
close() 方法用来关闭当前的序列化器,通常状况下 close()是一个空方法,若是实现了此方法,则必须确保此方法的幂等性,由于这个方法极可能会被KafkaProducer调用屡次。
<font color=#C7063 size=3>分区器</font> 分区器的做用就是为消息分配分区。序列化器是必需的。消息通过序列化以后就须要肯定它发往的分区,若是消息ProducerRecord中指定了partition字段,那么就不须要分区器的做用,由于partition表明的就是所要发往的分区号。
消息累加器 RecordAccumulator
RecordAccumulator 主要用来缓存消息以便 Sender 线程能够批量发送,进而减小网络传输的资源消耗以提高性能。RecordAccumulator 缓存的大小能够经过生产者客户端参数buffer.memory 配置,默认值为 32MB。 若是生产者发送消息的速度超过发送到服务器的速度,则会致使生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每一个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中能够包含一至多个 ProducerRecord。
通俗地说,ProducerRecord 是生产者中建立的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也能够减小网络请求的次数以提高总体的吞吐量
sender 线程
Sender 从 RecordAccumulator 中获取缓存的消息以后,会进一步将本来<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点,Sender 还会进一步封装成<Node,Request>的形式,这样就能够将Request请求发往各个Node了,这里的Request是指Kafka的各类协议请求,请求在从Sender线程发往Kafka以前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque<Request>> 它的主要做用是缓存了已经发出去但尚未收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
原创不易,若是以为有点用的话,请绝不留情点个赞,转发一下,这将是我持续输出优质文章的最强动力。