Topic在逻辑上能够被认为是一个queue队列,每条消息都必须指定它的topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为 了使得Kafka的吞吐率能够水平扩展,物理上把topic分红一个或多个partition,每一个partition在物理上对应一个文件夹,该文件夹 下存储这个partition的全部消息和索引文件。
每一个日志文件都是“log entries”序列,每个log entry
包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下惟一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消费格式以下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
这个“log entries”并不是由一个文件构成,而是分红多个segment,每一个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每一个segment下包含的log entry
的offset范围,
由于每条消息都被append到该partition中,是顺序写磁盘,所以效率很是高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪个partition。若是partition规则设置的合理, 全部消息能够均匀分布到不一样的partition里,这样就实现了水平扩展。(若是一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个 topic的性能瓶颈,而partition解决了这个问题)。在建立topic时能够在$KAFKA_HOME/config/server.properties
中指定这个partition的数量(以下所示),固然也能够在topic建立以后去修改parition数量。java
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3
在发送一条消息时,能够指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪一个 parition。paritition机制能够经过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner
接口。本例中若是key能够被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每一个parition都会有个序号)app
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class JasonPartitioner<T> implements Partitioner { public JasonPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { try { int partitionNum = Integer.parseInt((String) key); return Math.abs(Integer.parseInt((String) key) % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
若是将上例中的class做为partition.class,并经过以下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。 ide
public void sendMessage() throws InterruptedException{ for(int i = 1; i <= 5; i++){ List messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 4; j++){ messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j)); } producer.send(messageList); } producer.close(); }
则key相同的消息会被发送并存储到同一个partition里,并且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。以下图所示。
对于传统的message queue而言,通常会删除已经被消费的消息,而Kafka集群会保留全部的消息,不管其被消费与否。固然,由于磁盘限制,不可能永久保留全部数据(实际 上也不必),所以Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如能够经过配置$KAFKA_HOME/config/server.properties
,让Kafka删除一周前的数据,也可经过配置让Kafka在partition文件超过1GB时删除旧数据,以下所示。性能
############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # By default the log cleaner is disabled and the log retention policy will default to #just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs #can then be marked for log compaction. log.cleaner.enable=false
这里要注意,由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除文件与Kafka性能无关,选择怎样的删除策 略只与磁盘以及具体的需求有关。另外,Kafka会为每个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常状况下 consumer会在消费完一条消息后线性增长这个offset。固然,consumer也可将offset设成一个较小的值,从新消费一些消息。由于 offet由consumer控制,因此Kafka broker是无状态的,它不须要标记哪些消息被哪些consumer过,不须要经过broker去保证同一个consumer group只有一个consumer能消费某一条消息,所以也就不须要锁机制,这也为Kafka的高吞吐率提供了有力保障。this