3-kafka0.10 生产者详解

Kafka在0.8.1版本的时候重写了Producer。在0.9版本中又重写了Consumer,纯Java,没有了对Scala和ZK的依赖。网络

1、消息的发送流程:app

  • KafkaProducer:
    • 等待 topic meteData 数据的更新,序列化 key,value;
    • 根据 topic 的 partition 个数和 key 的值,计算该条消息所属的 partition,将消息 append 给 RecordAccumulator;
  • RecordAccumulator:
    • 使用Map类型的 batches ( ConcurrentMap<TopicPartition, Deque<RecordBatch>> ) 维护发向全部 topic 的消息数据;
    • append 方法内将发送的这条消息 tryAppend 进对应 Deque 最后一个 RecordBatch 中。若是空间不够,该 RecordBatch 就会 flip ByteBuffer,进入只读状态。空间不够或者失败则会在 Deque 末端尝试新起一个 RecordBatch;
  • Sender:
    • KafkaProducer 初始化的时候会启一个 KafkaThread 线程,运行 Runnable 的 Sender 对象,不停地发送 RecordAccumulator 内累积的消息;
    • 调用 RecordAccumulator 的 ready 方法收集到这次发送任务的目的地,即 Broker Leader 的列表,消息都是发送给所属 Partition 目前是 Leader 的那个 Broker 节点的;
    • 调用 NetworkClient 的 ready 方法,判断收集到的每一个 Leader 节点是不是 connected 状态,否的话会被移除;
    • 调用 RecordAccumulator 的 drain 方法,得到发送给每一个 Broker 节点的 RecordBatch 列表。将发往每一个 Broker 节点的 RecordBatch 数据,封装成一个 ClientRequest,主要的消息内容由 RequestSend 内的 Struct 结构表示。Struct 内部已将消息按 topic 分开,并是按 kafka 消息的 schema 生成,具备以下的嵌套结构:{“acks”:1,”topic_data”:[{"topic": "xxx", "data": [{"partition": 1, "record_set": ByteBuffer}]}]}。ClientRequest 内还包括发完消息后的 CallBack 处理逻辑;
    • 遍历每一个 ClientRequest,调用 NetworkClient 的 send 方法,将 RequestSend 放进 Selector.channels 内对应的 KafkaChannel 中;
    • 调用 NetworkClient 的 poll 方法,将 RequestSend 真正的发送给 Broker;
  • RecordAccumulator:
    • ready 方法中检查每一个 Deque 的第一个 RecordBatch 是不是 ready 的状态,并把 RecordBatch 对应的 Broker Leader 节点收集起来好向它们发送消息。判断 RecordBatch 是否 ready 涉及到这个 bath 是否满了、距离上一次检查是否够久等。例如若是 RecordBatch 所在的 Deque 长度大于1,证实这个 RecordBatch 曾今被 append 的时候发现已经满了,如今是只读待发状态,是 ready 的。须要等待的时长受是否处在 backoff 时期,是否超过 linger 时长等影响;
    • drain 方法中遍历收集到的、 connected 状态的 Broker Leader 节点,根据每一个节点下归属的 Partition 对应从 batches 中的 Deque 中取出第一个 RecordBatch,拼装成 Map<Integer, List<RecordBatch>> 的结构,key 是 Broker 节点的 id, value 是发给该节点的 RecordBatch 列表;
  • NetworkClient:
    • 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 维护着每一个 Broker 节点的链接状态;
    • ready 方法中判断是否跟指定的 Broker 节点是 connected 的状态,否的话会经过 Selector 的 connect 方法初始化跟其的链接,创建 SocketChannel 并 register,KafkaChannel 会 attach 在 SelectionKey 上 ;
    • poll 方法中调用 Selector 的 poll 方法,处理 Selector 内的 completedSends,completedReceives等,处理 ClientResponse, 遍历 RecordBatch 内的List<Thunk>,完成回调逻辑的处理;
  • Selector:
    • 使用 channels (Map<String, KafkaChannel>) 维护着与每一个 Broker 节点的 Channel;
    • 使用 completedSends (List<Send>)  维护着已经发送完毕的 RequestSend
    • 使用 completedReceives (List<NetworkReceive>)  维护着来自 Broker 的 response;
    • poll 方法中遍历 SelectionKey, 若是 KafkaChannel ready + SelectionKeywritable,那么就将 KafkaChannel 中的 RequestSend 发送,并维护更新 completedSends;若是 KafkaChannel ready + SelectionKey readable,那么就接受来自 Broker 的 NetworkReceive,并维护更新 completedReceives;

2、延迟与吞吐量的问题:
Case1: Producer将消息一条接一条发送到 Broker,假设发送延迟是 2ms,那么 1s 能够发送 500 条消息;ui

Case2: Producer将消息延迟 8ms 发送,假设 8ms 内收集到 20 条消息,那么 1s 能够发送 2000 条消息;this

两个重要的参数:
batch.size:  This is an upper limit of how many messages Kafka Producer will attempt to batch before sending, specified in bytes (default is 16K bytes). Kafka may send batches before this limit is, but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.线程

linger.ms:  How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch.  (default is 0). Sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency.对象

3、总结:
Kafka 的 Producer 经过把将要发送的消息先放在 RecordAccumulator 的 batches 内累积一段时间,而后进行小批量提交给 Broker 的方式,减小网络往返的开销,牺牲一点latency 换取 throughput。ip

相关文章
相关标签/搜索