Kafka 学习笔记归整

Kafka 的安装和配置

0 安装

略,到官网下载便可。注意 Kafka 还须要 Zookeeper 支持。java

Kafka 版本 : 
kafka_2.13-2.4.0

Zookeeper 版本 : 
Zookeeper-3.5.4-beta

jdk 版本 : 
openjdk 8

1 Kafka 配置

Kafka 的主要配置文件是 /config/server.properties。spring

## 实例 id,同一集群中的全部实例的 id 不可相同
broker.id = 0

## kafka 服务监听的地址和端口
## 设置了 advertised.listeners 以后能够不设置客户端的 hosts 文件,缘由未知
listener = PLAINTEXT://xxxxxxx:9092
advertised.listeners = PLAINTEXT://xxxxxxx:9092

##### 通用配置 #####

## 处理磁盘 io 的线程数
num.io.threads = 8

## 处理网络 io 的线程数
num.network.threads = 3

## 用来处理后台任务的线程数,例如过时消息文件的删除等
backgroud.threads = 4

## kafka 启动时回复数据和关闭时保存数据到磁盘时使用的线程数
num.recovery.threads.per.data.dir = 10

## 消息发送的缓存区大小
socket.send.buffer.bytes = 102400

## 消息接收的缓存区大小
socket.receive.buffer.bytes = 102400

## socket 请求的最大缓存值
socket.request.max.bytes = 10240000

## 再平衡延迟时间
## 再平衡的意义是当 consumer 发生上下线的时候,会从新分配 partition 的消费权
## 延迟的好处是若是 consumer 在一段时间内集中上下线,能够在延迟时间以后一次性处理
## 若是发生一次变更就处理一次,效率会较低
## 默认为 0 ms,即为不延迟
group.initial.rebalance.delay.ms = 10


##### 数据日志配置 #####

## kafka 日志数据存储目录,用逗号分割能够指定多个
# log.dirs = /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs


## 刷新日志到磁盘的阈值
# 根据消息数量作刷新,默认使用该策略
# log.flush.interval.messages = 10000
# 根据时间作刷新,单位 毫秒,默认不开启
log.flush.interval.ms = 1000

## 检查刷新机制的时间间隔
## 该参数的意义是每隔必定时间检查一次是否到达 flush 的设置阈值
log.flush.scheduler.interval.ms = 3000

## 记录上次固化数据到硬盘的时间点,主要用于数据恢复
## 默认值 60000
log.flush.offset.checkpoint.interval.ms = 60000


## 日志的存储时间,能够以小时单位,也能够设置为分钟或者毫秒
## 当超过这个时间,就会执行日志清除
# log.retention.minutes = 120
# log.retention.ms = 120
# 默认使用小时,值为 168
log.retention.hours = 2

## 每一个 partition 的存储大小,若是超过了就会执行日志清除
## -1 表明不限制,默认 1073741824
log.retention.bytes = 1073741824

## 日志大小的检查周期,若是已经到达了大小,就触发文件删除
log.retention.check.interval.ms = 300000


## 指定日志每隔多久检查一次是否能够被删除,默认为 1 min
log.cleanup.internal.mins = 1

## 日志清除的策略,默认为 delete
## 若是要使用日志压缩,就须要让策略包含 compact
## 须要注意的是,若是开启了 compact 策略,则客户端提交的消息的 key 不容许为 null,不然提交报错
# log.cleanup.policy = delete
log.cleanup.policy = delete


## 是否开启日志压缩
## 默认开启,可是只有在日志清除策略包含 compact 的时候日志压缩才会生效
## 日志压缩的逻辑是对 key 进行整合,对相同 key 的不一样 value 值只保存最后一个版本
log.cleaner.enable = true

## 开启压缩的状况才生效,日志压缩运行的线程数
log.cleaner.threads = 8

## 日志压缩去重的缓存内存,内存越大效率越好
## 单位 byte,默认 524288 byte
log.cleaner.io.buffer.size = 524288

## 日志清理的频率,越大就越高效,可是内存消耗会更大
log.cleaner.min.cleanable.ratio = 0.7


## 单个日志文件的大小,默认 1073741824
log.segment.bytes = 1073741824

## 日志被真正清除的时间
## 日志过了保存时间以后,只是被逻辑性删除,没法被索引到,可是没有真的从磁盘中被删除
## 此参数用于设置在被标注为逻辑删除后的日志被真正删除的时间
log.segment.delete.delay.ms = 60000


##### Topic 配置 #####

## 是否容许自动建立 topic
## 若为 false,则只能经过命令建立 topic
## 默认 true
auto.create.topics.enable = true

## 每一个 topic 的默认分区 partition 个数,在 topic 建立的时候能够指定,若是不指定就使用该参数
## partition 数量直接影响了可以容纳的 cosumer 数量
num.partitions = 1

## topic partition 的副本数,副本越多,越不容易由于个别 broker 的问题而丢失数据
## 副本越多,可用性越高,可是每次数据写入以后同步花费的时间更多
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1



##### Zookeeper 配置 #####

## Zookeeper 地址,用逗号分割能够指定多个
# zookeeper.connect = localhost:2181,localhost:2182
zookeeper.connect = localhost:2101
## Zookeeper 集群的超时时间
zookeeper.connection.timeout.ms = 6000

2 建立 topic

上述配置中设置了自动建立 topic,可是也能够手工建立 :apache

./bin/kafka-topics.sh --create \
--zookeeper localhost:2101 \
--replication-factor 1 \
--partitions 1 \
--topic test1

查看 topic :bootstrap

./bin/kafka-topics.sh --zookeeper localhost:2101 --list

3 启动 Kafka

./bin/kafka-server-start.sh -daemon ./config/server.properties &

SpringBoot Kafka 配置代码

1 pom

spring boot 版本 :缓存

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.2.RELEASE</version>
    <relativePath/>
</parent>

引入 jar 包 :bash

<!-- kafka 必须的包 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.1.RELEASE</version>
    <exclusions>
        <!-- 若是已经在别处引入 spring-boot-starter,此处能够排除 spring 相关的包 -->
        <exclusion>
            <groupId>org.springframework</groupId>
            <artifactId>spring-*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2 yaml 配置

spring:
  kafka:

    ## 生产者配置,若是本实例只是消费者,能够不配置该部分
    producer:
      # client id,随意配置,不可重复
      client-id: boot-producer
      # kafka 服务地址,pi + 端口
      bootstrap-servers: aliyun-ecs:9092
      # 用于序列化的工具类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息发送失败状况下的重试次数
      retries: 1
      # 批量上传的 buffer size,能够是消息数量,也能够是内存量
      batch-size: 10000
      buffer-memory: 300000
      # 等待副本同步以后才确认消息发送成功,可选的值有 0,1,-1,all 等
      # 设置为 0 的意思是不等待任何副本同步完成就直接返回
      # 设置为 1 的意思是只等待 leader 同步完成
      # all 的意思是所有同步完才确认,可是速度会比较慢
      acks: 1

    ## 消费者配置,若是本实例只是生产者,能够不配置该部分
    consumer:
      # client id,随意配置,不可重复
      client-id: boot-consumer
      # 消费者分组 id,同一组别的不一样消费者共同消费一份数据
      group-id: consumer-group-1
      # kafka 服务地址,pi + 端口
      bootstrap-servers: aliyun-ecs:9092
      # 用于反序列化的工具类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 自动更新 offset
      enable-auto-commit: true
      # 若是 enable-auto-commit 设置为 true,则每隔一段时间提交一次 offset
      # 时间单位为毫秒,默认值 5000 (5s)
      auto-commit-interval: 1000
      # offset 消费指针
      # earliest 表明从头开始消费,lastest 表明重新产生的部分开始消费
      auto-offset-reset: earliest

3 代码

生产者配套代码:网络

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * kafka mq 生产者包装类
 */
@Component
public class MQProductor {

    @Resource
    KafkaTemplate<String,String> kt;

    /**
     * 发送消息的方法
     * @param topic  建立的 topic
     * @param partition  topic 分片编号,从 0 开始
     * @param key  消息 key,主要用来分片和做为压缩凭据,能够重复,能够为空
     * @param message  消息主体
     */
    public void send(String topic,Integer partition,String key,String message) {
        kt.send(topic,partition,key,message);
    }

    public void send(String message) {
        send("test-topic",0,"",message);
    }
}

消费者配套代码:socket

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka mq 消费者包装类
 */
@Component
public class MQListener {

    /**
     * 监听方法,能够一次性监听多个 topic
     * @param cr  kafka 返回的消息包装类
     */
    @KafkaListener(topics = {"test-topic" /*,"test-topic-2"*/ })
    public void consume(ConsumerRecord<String,String> cr) {
        // value
        String value = cr.value();
        System.out.println(value);
        // key
        String key = cr.key();
        System.out.println(key);
        // 读取指针
        long offset = cr.offset();
        System.out.println(offset);
        // 读取的分区编号
        int partition = cr.partition();
        System.out.println(partition);
        // topic
        String topic = cr.topic();
        System.out.println(topic);
    }
}
相关文章
相关标签/搜索