解压kafka的安装包后,在conf目录下server.propertieslinux
#broker 的全局惟一编号,在kafka集群中不能重复,为整型数字 broker.id=0 #开启删除topic功能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 # kafka运行数据保存目录 kafka data是以.log后缀的 log.dirs=/data/kafka/logs #topic 在当前 broker 上的分区数量,下面配置一个分区 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment log.dirs中文件保留的最长时间,超时将被删除,默认为7天也就是168小时 log.retention.hours=168 #kafka须要zookeeper进行管理,配置链接 Zookeeper 集群地址 zookeeper.connect=192.168.152.163:2181,192.168.152.162:2181,192.168.152.161:2181
启动kafka网络
kafka-server-start.sh -daemon config/server.properties //若是没有加-daemon的话,将会阻塞方式运行
关闭kafka并发
kafka-server-stop.sh stop
kafka是基于topic的可分区的分布式的消息中间件。消息生产者经过将消息发送至broker的某一个分区,消费者经过broker的分区获取消息进行消费,一个broker可有多个分区,一个topic可有多个分区,topic下的每个分区存在副本机制,也就是存在一个leader和多个follower,生产者将消息写入leader分区,follower同步leader中新增的消息,每一个分区都有index文件和log文件。消费者消费分区中的消息经过偏移值offset进行记录。app
kafka存储消息到文件中采用分片及索引机制,每一个topic在一个broker上的数据放在一个文件夹下,该文件夹名称为topic名称加分区号,如minerprofit-1,(minerprofit为topic名称,1为分区号),每个分区上的数据又分为多个segment,每个segment有index文件和log文件,segment中index和log文件的命名是在该segment第一个offset值。socket
0000000000000000000000.index 0000000000000000000000.log 0000000000000000017866.index 0000000000000000017866.log 0000000000000000029866.index 0000000000000000029866.log
log文件记录具体的消息数据,index记录的是消息的偏移值与消息数据的索引值,依据offset值找到具体的消息的过程是:例如offset为5,首先在index文件中,采用二分法查找offset为5对应的索引值,而后获取该索引值,在log文件中依据该索引值获取消息。分布式
kafka零拷贝技术: linux有用户态和核心态,正常的访问文件linux须要核心态和用户态进行转换,kafka内部机制可以作到不通过用户态,具体机制能够了解了解。ide
kafka顺序写磁盘。高并发
分区的优劣:ui
优点:this
高并发: 采起分区机制,将不一样数据放置在不一样的分区,提升并发能力,相似ConcurrentHashMap的分段锁机制,当须要访问不一样分区的数据,可以将锁粒度下降。 方便扩展: 当有多个broker时候,多个topic可方便扩展。
缺点:
分区的副本机制可能致使生产者发送的消息不可以正常同步到全部follower,当leader挂掉后,形成消息丢失。
生产者是经过封装一个ProductRecord对象,须要指定具体的topic,分区,key,value,headers。
/** * Creates a record with a specified timestamp to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign * the timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents * @param headers the headers that will be included in the record */ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
消息发送到具体分区的选择:
生产者消息确认(ack)机制:
kafka分区存在副本机制,一个leader和多个follower,生产者将消息发送到leader,follower同leader同步数据,存在一种状况,当生产者将消息发送至leader后,follower还未同步到数据,leader就宕机了,而后follower就选择新的leader,新的leader进行数据同步后,就不清楚了生产者发送的消息时候成功了,因此在此过程当中就涉及到消息确认问题。
副本同步数据有半数确认机制和所有确认机制,kafka使用的是全数确认机制。
kafka设计了三种acks消息确认机制:
kafka消息重复或者消息丢失均可能在此发生。
public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are allowed: " + " <ul>" + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the" + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" + " made that the server has received the record in this case, and the <code>retries</code> configuration will not" + " take effect (as the client won't generally know of any failures). The offset given back for each record will" + " always be set to -1." + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond" + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" + " acknowledging the record but before the followers have replicated it then the record will be lost." + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
ISR机制:
当acks使用all的时候,全部follower同步数据后再发送acks就会很慢,kafka设计维护一个isr列表,isr是全部follower中的一部分,当isr内的follower同步完了数据,leader就会马上发送ack确认,isr经过肯定follower是否在必定时间内与其是否完成数据同步后而确认是否将该follower加入到isr列表中。该时间经过replica.lag.time.max.ms参数设定。
log文件中的HW,LEO
发布订阅模式中消费者消费消息有两种方式,一种是消息中间推送消息到消费者,另外一种方式是消费者经过消息中间件拉去消息。推的方式会致使消费者处理消息的速度赶不上推送的速度,拉的方式就会致使消费者在没有消息的时候会不断的轮询,致使空运行,kafka采用拉的方式,能够设置时间来间隔性的访问消息中间件。
消费者组:多个消费者可组成一个消费者组,一个消费者组订阅一个topic,消费者组中的一个消费者消费了消息就不会再有另一个消费者消费消息,当有多个消费者后就涉及分区分消费者策略。
分区分消费者策略:
消费者offset值维护:
kafka老版本将offset维护放置zookeeper中,0.9版本后将维护放置在kafka __consumer_offset topic 中,
exclude.internal.topics=false #开启内部topic