RecordAccumulator
前面几个组件,在 3.1 的文章中,已经说清楚。如今来看 RecordAccumulator
组件java
RecordAccumulator
主要用于缓存消息,以便 Sender
线程可以批量发送消息。RecordAccumulator
会将消息放入缓存 BufferPool
(实际上就是 ByteBuffer
) 中。BufferPool
默认最大为 33554432B
,即 32MB
, 可经过 buffer.memory
进行配置。
当生产者生产消息的速度大于 sender
线程的发送速度,那么 send
方法就会阻塞。默认阻塞 60000ms
,可经过 max.block.ms
配置。node
RecordAccumulator
类的几个重要属性api
public final class RecordAccumulator { private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; // 缓存空间,默认 32MB,可经过上面说的 buffer.memory 参数进行配置 private final BufferPool free; }
TopicPartition
为分区的抽象。定义以下所示数组
public final class TopicPartition implements Serializable { private int hash = 0; private final int partition; private final String topic; }
主线程发送的消息,都会被放入batcher
中, batches
将发往不一样 TopicPartition
的消息,存放到各自的 ArrayDeque<ProducerBatch>
中。
主线程 append
时,往队尾插入,sender
线程取出时,则往队头取出。缓存
ProducerBatch
批量消息ProducerBatch
为批量消息的抽象。
在编写客户端发送消息时,客户端面向的类则是 ProducerRecord
,kafka
客户端,在发送消息时,会将 ProducerRecord
放入 ProducerBatch
,使消息更加紧凑。
若是为每一个消息都独自建立内存空间,那么内存空间的开辟和释放,则将会比较耗时。所以 ProducerBatch
内部有一个 ByteBufferOutputStream bufferStream
(实则为 ByteBuffer
), 使用 ByteBuffer
重复利用内存空间。网络
bufferStream
值的大小为:数据结构
public final class RecordAccumulator { // 该值大小,可经过 buffer.memory 配置 private final BufferPool free; public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); } }
其中,batchSize
默认 16384B
,即 16kb
,可经过 batch.size
配置。第2个入参的值则为消息的大小。app
须要注意的是,bufferStream
的内存空间是从 free
内存空间中划出的。 异步
上面有说到,ProducerBatch
会使用 ByteBuffer
追加消息。可是,若是你看代码,你会发现 ProducerBatch
在作消息的追加时,会将消息放入 DataOutputStream appendStream
。好像跟咱们说的 不同! 可是实际上,就是利用 ByteBuffer
,这里还须要看 appendStream
是如何初始化的!ui
注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性
public class MemoryRecordsBuilder { private final ByteBufferOutputStream bufferStream; private DataOutputStream appendStream; private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - firstTimestamp; // 往 appendStream 中追加消息 int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } }
MemoryRecordsBuilder
初始化
public class MemoryRecordsBuilder { private final ByteBufferOutputStream bufferStream; private DataOutputStream appendStream; public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, long baseOffset, long logAppendTime, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { // ..省略部分代码 bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; // 使用 bufferStream 包装 this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } }
能够看到实际上使用的仍是 ByteBufferOutputStream bufferStream
Sender
线程Sender
线程在发送消息时,会从 RecordAccumulator
中取出消息,并将放在 RecordAccumulator
中的 Deque<ProducerBatch>
转换成 Map<nodeId, List<ProducerBatch>>
,这里的 nodeId
是 kafka
节点的 id
。再发送给 kafka
以前,又会将消息封装成 Map<nodeId, ClientRequest>
。
请求在从 Sender
发往 kafka
时,还会被存入 InFlightRequests
public class NetworkClient implements KafkaClient { /* the set of requests currently being sent or awaiting a response */ private final InFlightRequests inFlightRequests; private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { int latestClientVersion = clientRequest.apiKey().latestVersion(); if (header.apiVersion() == latestClientVersion) { log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, clientRequest.correlationId(), destination); } else { log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}", header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination); } } Send send = request.toSend(destination, header); InFlightRequest inFlightRequest = new InFlightRequest( clientRequest, header, isInternalRequest, request, send, now); // 将请求放入 this.inFlightRequests.add(inFlightRequest); selector.send(send); } }
InFlightRequests
/** * The set of requests which have been sent or are being sent but haven't yet received a response */ final class InFlightRequests { private final int maxInFlightRequestsPerConnection; private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>(); }
InFlightRequests
的做用是存储已经发送的,或者发送了,可是未收到响应的请求。InFlightRequests
类中有一个属性 maxInFlightRequestsPerConnection
, 标识一个节点最多能够缓存多少个请求。该默认值为 5
, 可经过 max.in.flight.requests.per.connection
进行配置, 须要注意的是 InFlightRequests
对象是在建立 KafkaProducer
时就会被建立。
requests
参数的 key
为 nodeId
,value
则为缓存的请求。
sender
线程 在发送消息时,会先判断 InFlightRequests
对应的请求缓存中是否超过了 maxInFlightRequestsPerConnection
的大小
代码入口:Sender.sendProducerData
public class Sender implements Runnable { private long sendProducerData(long now) { // ... 省略部分代码 while (iter.hasNext()) { Node node = iter.next(); // todo 这里为代码入口 if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } } // ... 省略部分代码 } } public class NetworkClient implements KafkaClient { private boolean canSendRequest(String node, long now) { return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } } final class InFlightRequests { public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); } }
从 InFlightRequests
的设计中,能够看到,咱们能够很轻松的就知道,哪一个 kafka
节点的负载是最低。由于只须要判断 requests
中对应 node
集合的大小便可。
acks
用于指定分区中须要有多少个副本收到消息,生产者才会认为消息是被写入的acks
= 1。默认为1, 只要 leader
副本写入,则被认为已经写入。若是消息已经被写入 leader
副本,且已经返回给生产者 ok
,可是在 follower
拉取 leader
消息以前, leader
副本忽然挂掉,那么此时消息也会丢失acks
= 0。发送消息后,不须要等待服务端的响应,此配置,吞吐量最高。acks
= -1 或者 all。须要等待全部 ISR
中的全部副本都成功写入消息以后,才会收到服务端的成功响应。
须要注意的一点是 acks
入参是 String
,而不是 int
max.request.size
客户端容许发送的消息最大长度,默认为 1MB
.
retries
、retry.backoff.ms
retries
配置生产者的重试次数,默认为 0
. retry.backoff.ms
配置两次重试的间隔时间
compression.type
指定消息的压缩方式,默认为 none
。可选配置gzip
,snappy
,lz4
connection.max.idle.ms
指定在多久以后关闭闲置的链接,默认 540000(ms)
,即 9分钟
linger.ms
指定发送 ProducerBatch
以前等待更多的消息(ProducerRecord
) 加入 ProducerBatch
的时间,默认为 0
。生产者在 ProducerBatch
填充满时,或者等待时间超过 linger.ms
发送消息出去。
receive.buffer.bytes
设置 Socket
接收消息缓存区的大小,默认 32678B
, 32KB
。若是设置为 -1
, 则表示使用 操做系统的默认值。若是 Procuer
和 kafka
处于不一样的机房,能够调大此参数。
send.buffer.bytes
设置 Socket
发送消息缓冲区大小。默认 131072B
, 即128KB
。若是设置为 -1
,则使用操做系统的默认值
request.timeout.ms
Producer
等待响应的最长时间,默认 30000ms
。须要注意的是,该参数须要比 replica.lag.time.max.ms
值更大。能够减小因客户端重试,而形成的消息重复
buffer.memory
配置消息追加器,内存大小。默认最大为 33554432B
,即 32MB
batch.size
ProducerBatch
ByteBuffer
。默认 16384B
,即 16kb
max.block.ms
生产者生成消息过快时,客户端最多阻塞多少时间。
kafka
将生产者生产消息,消息发送给服务端,拆成了 2 个过程。生产消息交由 主线程, 消息发送给服务端的任务交由 sender
线程。RecordAccumulator
的设计,将生产消息,与发送消息解耦。RecordAccumulator
内部存储数据的数据结构是 ArrayDeque
. 队尾追加消息,队头取出消息ProducerRecord
,在消息发送以前会被转为 ProducetBatch
。为的是批量发送消息,提升网络 IO 效率kafka
设计了 InFlightRequests
, 将为响应的消息放入其中buffer.memory
最好是 buffer.memory
整数倍大小。由于 ProducerBatch
的 ByteBuffer
是从 RecordAccumulator
的 ByteBuffer
中划出的RocketMQ
区别RocketMQ
没有将生产消息与发送消息解耦。RocketMQ
的消息发送,分为 同步,异步、单向。其中单向发送与 kafka
的 acks
= 0 的配置效果同样。可是实际上,还得看 RocketMQ broker
的刷盘配置!kafka
发送失败,默认不重试,RocketMQ
默认重试 2 次。不过 RocketMQ
没法配置 2 次重试的间隔时间. kafka
能够配置重试的间隔时间。RocketMQ
默认消息最大为 4MB
, kafka
默认 1MB
RocketMQ
在消息的发送上,是直接使用 Netty
。kafka
则是使用 NIO
本身实现通讯。(虽然说,Netty
也是基于 NIO
)ByteBuffer
ByteBuffer
通常用于网络传输的缓冲区。
先来看下 ByteBuffer
的类继承体系
ByteBuffer
主要的 2 个父类。 DirectByteBuffer
、HeapByteBuffer
。通常而言,咱们主要的是使用 HeapByteBuffer
。
ByteBuffer
重要属性position
当前读取的位置
mark
为某一读过的位置作标记,便于某些时候回退到该位置
limit
读取的结束位置
capacity
buffer
大小
ByteBuffer
基本方法put()
往 buffer
中写数据,并将 position
往前移动
flip()
将 position
设置为0,limit
设置为当前位置
rewind()
将 position
设置为0, limit
不变
mark()
将 mark
设置为当前 position
值,调用 reset()
, 会将 mark
赋值给 position
clear()
将 position
设置为0,limit
设置为 capacity
ByteBuffer
食用DEMOFileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt"); FileChannel channel = fis.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); while (read != -1) { System.out.println(new String(buffer.array(), Charset.defaultCharset())); buffer.clear(); read = channel.read(buffer); }
ArrayDeque
ArrayDeque
,是一个双端队列。便可以从队头插入元素,也能够从队尾插入元素
对于双端队列,既可使用 链表的方式实现,也可使用数组的方式实现。JDK
中 LinkedList
使用链表实现,ArrayDeque
则使用数组的方式实现
来看 ArrayDeque
的实现。
ArrayDeque 中,有 head
, tail
分别指向 头指针,和尾指针。能够把 ArrayDeque
想象成循环数组
head
会往前走tail
会日后走能够看到,这里经过移动 head
, tail
指针就能够删除元素了。
当 tail
、head
都指向都一个位置时,则须要扩容
扩容会将数组的大小扩充为原来的 2 倍,而后从新将 head
指向数组 0
下标, tail
指向数组的最后一个元素位置。
上面的数组,在从新扩容后,会变成下面这个样子
public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable { private void doubleCapacity() { assert head == tail; int p = head; int n = elements.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) throw new IllegalStateException("Sorry, deque too big"); Object[] a = new Object[newCapacity]; System.arraycopy(elements, p, a, 0, r); System.arraycopy(elements, 0, a, r, p); elements = a; head = 0; tail = n; } }