kafka极简入门(四)--经常使用配置

回顾:kafka极简入门(三)--建立topicapache

前言

kafka针对broker, topic, producer, consumer的配置很是多,这里讲一下经常使用的配置bootstrap

1.broker相关配置
  • broker.id
    broker在kafka集群中的惟一标识,必须是一个大于等于0的整数,若是不写的话默认从1001开始。
    建议:把它设置成与机器名具备相关性的整数。
  • port
    设置kafka的端口号,默认状况下是9092,不建议修改为1024如下的端口,由于须要使用root权限启动。
  • zookeeper.connect
    设置zookeeper集群,该配置参数是一组用逗号隔开的host:port/path列表segmentfault

    • host是zookeeper服务器的机器名或ip地址
    • port是zookeeper服务器的端口
    • /path是可选的zookeeper路径,做为kafka集群的chroot环境,默认是根路径,若是指定的chroot路径不存在,kafka会在启动的时候建立它。使用chroot使得zookeeper集群能够共享给其余应用程序时而不会产生冲突。
  • log.dirs
    配置kafka日志片断的目录位置,多个路径以逗号隔开。若是配置了多个路径,kafka会根据最少使用原则,把统一分区的日志保存到同一路径下,注意的是,kafka会往拥有最少数量分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
  • auto.create.topics.enable
    配置是否开启自动建立topic,若是设置为true, kafka会在如下几种场景自动建立topic:安全

    • 当一个producer开始往topic写入消息时。
    • 当一个consumer开始从topic消费消息时。
    • 当一个client向topic发送元数据请求时。
  • num.partitions
    配置建立主题时包含多少个分区,默认值为1,由于咱们能增长主题的分区数,可是不能减小分区的个数,因此,若是要让一个主题的分区个数少于num.partitions须要手动建立该主题而不是经过自动建立主题。
  • log.retention.hours
    配置kafka保留数据的时间,默认为168小时也就是7天,效果等同log.retention.minutes和log.retention.ms,只是单位不同,分别是小时,分钟,和毫秒,推荐使用log.retention.ms,粒度更加细,若是三个参数都配置了则去数值最小的配置。
  • log.retention.bytes
    配置一个分区能保存最大的字节数,若是超出的部分就会被删除,同时配置了log.retention.hours/log.retention.minutes/log.retention.ms的话,任一个知足条件都会触发数据删除。
  • message.max.bytes
    配置消息的大小限制,默认为100000,也就是1M,这里的大小是指在kafka压缩后的大小,也就是说实际消息能够大于1M,若是消息超过这个限制,则会被kafka拒收。

2.producer相关配置
  • bootstrap.servers
    配置broker的地址,格式为host:port,若是多个则以逗号隔开,不须要配置全部的broker,producer会从给定的broker查询其余broker的信息,不过建议至少填写两个,以防在一个宕机的状况还能从另一个去获取broker的信息。
  • acks
    acks指定了必需要有多少个分区副本收到消息,producer才会认为消息写入是成功的。服务器

    • acks=0 producer发送消息不等待任何来自服务器的响应,因此会出现消息丢失而producer不感知的状况,该模式下可获取最大的吞吐量。
    • acks=1 只要集群的leader节点收到消息,producer就会收到一个服务器的成功响应,若是消息没法达到leader节点,那么producer就会获取到一个错误响应,这时候为了不消息的丢失,producer能够选择重发,不过若是一个没有收到消息的节点成为新的leader,那么消息仍是会丢失。
    • acks=all 只有当leader节点和follower节点都收到消息时,producer才会收到成功的响应,这是一个避免消息丢失最安全的作法,不过这种模式吞吐量最低
  • client.id
    能够是任意字符串,标识消息的来源
  • max.in.flight.requests.per.connection
    配置producer在收到服务器响应前能够发送的消息个数,值越高,吞吐量就会越高,不过相应的占用的内存也会越多,设置为1能够保证消息能够按照发送的顺序写入服务,即使发生了重试。
  • max.request.size
    配置producer单次发送的全部消息的总的大小限制,例如设置为1M,单个消息大小为1K,那么单次能够发的上限是1000个,最好跟message.max.bytes配置匹配,避免发送到broker的消息被拒绝。
  • retries
    该参数决定了producer能够重发消息的次数,producer从broker收到的错误多是临时性的,例如分区找不到首领,这种状况下,producer在进行retries次重试后就会放弃重试而且返回错误,默认状况下,重试的时间间隔为100ms,能够经过retry.backoff.ms参数配置,建议在设置重试间隔以前最好测试一下恢复一个崩溃的节点要多长时间,重试的间隔最比如恢复时间要长。
  • batch.size
    当多个消息往同一个分区发送时,producer会把这些消息放到同一个分区,该参数指定了一个批次可使用的内存大小,按字节数计算,当批次填满时消息就会被发送出去,不过producer不必定等批次被填满才会发送,甚至只有一个消息也会被发送,因此就算把该值设置得很大也不会形成延迟,只不过会占用内存,可是若是设置过小的话,producer会很频繁的发送,增长一些额外的开销。
  • linger.ms
    指定producer发送批次以前要等待更多消息加入批次的时间,producer会在批次填满或者longer.ms到达上限时把批次发送出去,默认状况下,只要有可用的线程,就算批次只有一个消息,producer也会把消息发送出去。把linger.ms设置成比0大的数,让producer在发送批次以前多等待一会,可使得更多的消息能够加入到批次,虽然增长了延迟,可是同时也增长了吞吐量。

3.Consumer相关配置
  • fetch.min.bytes
    配置Consumer从broker获取记录的最小字节数,broker在收到Consumer的数据请求时,若是可用的数据量小于该配置,那么broker会等到有足够的可用数据时才把它返回给Consumer。
  • fetch.max.wait.ms
    配置broker的等待时间,默认为500ms,若是没有足够的数据流入,致使不知足fetch.mis.bytes,最终会致使500ms的延迟。若是fetch.mis.bytes配置为1M,fetch.max.wait.ms配置为500ms,那么最终broker要么返回1M的数据,要么等待500ms后返回全部可用的数据,取决于哪一个条件先获得知足。
  • max.partition.fetch.bytes
    配置broker从每一个分区返回给Consumer的最大字节数,默认为1MB,也就是说,KafkaConsumer.poll()方法从每一个分区里面返回的记录最多不超过该值,加入有10个分区5个消费者,则每一个消费者须要2MB的内存来接收消息,须要注意的是,若是该值设置过大,致使消费者处理的业务的时间过长,会有回话超时的风险。
  • session.timeout.ms
    配置了Consumer被认定为死亡前能够与服务器断开链接的时间,默认3秒,若是服务器在超过该值时间没有收到Consumer的心跳,就会认定Consumer死亡,会触发再均衡,把死亡Consumer的分区分配给其余Consumer,这个配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms设置了poll()方法向协调器发送心跳的频率。建议heartbeat.interval.ms的值为session.timeout.ms的三分之一。
  • enable.auto.commit
    指定了Consumer是否开启自动提交偏移量,默认为true。能够把它设置为false,由程序本身控制什么时候提交偏移量来避免出现重复数据和数据丢失的状况。
  • auto.offset.reset
    指定在Consumer读取一个没有偏移量的分区或者偏移量无效的状况下的策略(由于消费者长时间失效,包含偏移量的记录已通过时并删除),默认为latest,表示会从最新的记录开始读取(在消费者启动以后生成的记录,会出现漏消费历史记录的状况),另一个配置是earliest,表示从最先的消息开始消费(会出现重复消费的状况)
  • partition.assignment.strategy
    分区分配给Consumer的策略,有两种:session

    • Range
      把topic的若干个连续的分区分配给Consumer,假设有Consumer1和Consumer2,分别订阅了topic1和topic2,每一个topic都有3个分区,那么Consumer1可能分配到topic1和topic2的分区0和分区1,Consumer2分配到topic1和topic2的分区2,这种策略会致使当分区数量(针对单个topic,上面例子是3个分区)没法被消费者数量(上面例子是2个消费者)整除时,就会出现分区分布不均匀的状况。
      Range.png
    • RoundRobin
      该策略会把全部的分区逐个分配给Consumer,仍是上面的例子,若是按这种策略分配那么Consumer1最终分到的是topic1的分区0,分区2和topic2的分区1,Consumer2最终分到的是topic1的分区1和topic2的分区0和分区2。
      RoundRobin.png

默认使用org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,RoundRabin的实现类为org.apache.kafka.clients.consumer.RoundRobinAssignor,咱们还能够自定义策略。测试

  • max.poll.records
    指定单次调用poll()方法返回的记录数量。fetch

若是对你有用麻烦点个赞哦 ^_^spa

相关文章
相关标签/搜索