consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。固然这里会产生一个很严重的问题,若是你重启一消费者程序,那你连一条数据都抓不到,可是log文件中明明能够看到全部数据都好好的存在。换句话说,一旦你消费过这些数据,那你就没法再次用同一个groupid消费同一组数据了。html
缘由:消费者消费了数据并不从队列中移除,只是记录了offset偏移量。同一个consumergroup的全部consumer合起来消费一个topic,而且他们每次消费的时候都会保存一个offset参数在zookeeper的root上。若是此时某个consumer挂了或者新增一个consumer进程,将会触发kafka的负载均衡,暂时性的重启全部consumer,从新分配哪一个consumer去消费哪一个partition,而后再继续经过保存在zookeeper上的offset参数继续读取数据。注意:offset保存的是consumer 组消费的消息偏移。 算法
要消费同一组数据,你能够api
1 采用不一样的group。并发
2 经过一些配置,就能够将线上产生的数据同步到镜像中去,而后再由特定的集群区处理大批量的数据。负载均衡
(2) 如何自定义去消费已经消费过的数据socket
Conosumer.properties配置文件中有两个重要参数fetch
auto.commit.enable:若是为true,则consumer的消费偏移offset会被记录到zookeeper。下次consumer启动时会今后位置继续消费。spa
auto.offset.reset 该参数只接受两个常量largest和Smallest,分别表示将当前offset指到日志文件的最开始位置和最近的位置。 .net
若是进一步想控制时间,则须要调用SimpleConsumer,本身去设置相关参数。比较重要的参数是 kafka.api.OffsetRequest.EarliestTime()和kafka.api.OffsetRequest.LatestTime()分别表示从日志(数据)的开始位置读取和只读取最新日志。线程
如何使用SimpleConsumer
首先,你必须知道读哪一个topic的哪一个partition
而后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,本身去写request并fetch数据
最终,还要注意须要识别和处理brokerleader的改变
1. 若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数 。
2. 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目 。
3. 若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样
4. 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
Kafka尽可能将全部的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。
(1) 如何分配副本:
Producer在发布消息到某个Partition时,先经过ZooKeeper找到该Partition的Leader,而后不管该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每一个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。
(2) Kafka分配Replica的算法以下:
(1)将全部Broker(假设共n个Broker)和待分配的Partition排序
(2)将第i个Partition分配到第(imod n)个Broker上
(3)将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
日志文件的删除策略很是简单:启动一个后台线程按期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的建立时间).清理参数在server.properties文件中:
(1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
(2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.
(3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息.
Server.properties配置文件中的一个参数
##是否自动建立topic
##若是broker中没有topic的信息,当producer/consumer操做topic时,是否自动建立.
##若是为false,则只能经过API或者command建立topic
auto.create.topics.enable=true
原文地址:https://blog.csdn.net/zgc625238677/article/details/52162202