略,到官网下载便可。注意 Kafka 还须要 Zookeeper 支持。java
Kafka 版本 : kafka_2.13-2.4.0 Zookeeper 版本 : Zookeeper-3.5.4-beta jdk 版本 : openjdk 8
Kafka 的主要配置文件是 /config/server.properties。spring
## 实例 id,同一集群中的全部实例的 id 不可相同 broker.id = 0 ## kafka 服务监听的地址和端口 ## 设置了 advertised.listeners 以后能够不设置客户端的 hosts 文件,缘由未知 listener = PLAINTEXT://xxxxxxx:9092 advertised.listeners = PLAINTEXT://xxxxxxx:9092 ##### 通用配置 ##### ## 处理磁盘 io 的线程数 num.io.threads = 8 ## 处理网络 io 的线程数 num.network.threads = 3 ## 用来处理后台任务的线程数,例如过时消息文件的删除等 backgroud.threads = 4 ## kafka 启动时回复数据和关闭时保存数据到磁盘时使用的线程数 num.recovery.threads.per.data.dir = 10 ## 消息发送的缓存区大小 socket.send.buffer.bytes = 102400 ## 消息接收的缓存区大小 socket.receive.buffer.bytes = 102400 ## socket 请求的最大缓存值 socket.request.max.bytes = 10240000 ## 再平衡延迟时间 ## 再平衡的意义是当 consumer 发生上下线的时候,会从新分配 partition 的消费权 ## 延迟的好处是若是 consumer 在一段时间内集中上下线,能够在延迟时间以后一次性处理 ## 若是发生一次变更就处理一次,效率会较低 ## 默认为 0 ms,即为不延迟 group.initial.rebalance.delay.ms = 10 ##### 数据日志配置 ##### ## kafka 日志数据存储目录,用逗号分割能够指定多个 # log.dirs = /tmp/kafka-logs-1,/tmp/kafka-logs-2 log.dirs = /tmp/kafka-logs ## 刷新日志到磁盘的阈值 # 根据消息数量作刷新,默认使用该策略 # log.flush.interval.messages = 10000 # 根据时间作刷新,单位 毫秒,默认不开启 log.flush.interval.ms = 1000 ## 检查刷新机制的时间间隔 ## 该参数的意义是每隔必定时间检查一次是否到达 flush 的设置阈值 log.flush.scheduler.interval.ms = 3000 ## 记录上次固化数据到硬盘的时间点,主要用于数据恢复 ## 默认值 60000 log.flush.offset.checkpoint.interval.ms = 60000 ## 日志的存储时间,能够以小时单位,也能够设置为分钟或者毫秒 ## 当超过这个时间,就会执行日志清除 # log.retention.minutes = 120 # log.retention.ms = 120 # 默认使用小时,值为 168 log.retention.hours = 2 ## 每一个 partition 的存储大小,若是超过了就会执行日志清除 ## -1 表明不限制,默认 1073741824 log.retention.bytes = 1073741824 ## 日志大小的检查周期,若是已经到达了大小,就触发文件删除 log.retention.check.interval.ms = 300000 ## 指定日志每隔多久检查一次是否能够被删除,默认为 1 min log.cleanup.internal.mins = 1 ## 日志清除的策略,默认为 delete ## 若是要使用日志压缩,就须要让策略包含 compact ## 须要注意的是,若是开启了 compact 策略,则客户端提交的消息的 key 不容许为 null,不然提交报错 # log.cleanup.policy = delete log.cleanup.policy = delete ## 是否开启日志压缩 ## 默认开启,可是只有在日志清除策略包含 compact 的时候日志压缩才会生效 ## 日志压缩的逻辑是对 key 进行整合,对相同 key 的不一样 value 值只保存最后一个版本 log.cleaner.enable = true ## 开启压缩的状况才生效,日志压缩运行的线程数 log.cleaner.threads = 8 ## 日志压缩去重的缓存内存,内存越大效率越好 ## 单位 byte,默认 524288 byte log.cleaner.io.buffer.size = 524288 ## 日志清理的频率,越大就越高效,可是内存消耗会更大 log.cleaner.min.cleanable.ratio = 0.7 ## 单个日志文件的大小,默认 1073741824 log.segment.bytes = 1073741824 ## 日志被真正清除的时间 ## 日志过了保存时间以后,只是被逻辑性删除,没法被索引到,可是没有真的从磁盘中被删除 ## 此参数用于设置在被标注为逻辑删除后的日志被真正删除的时间 log.segment.delete.delay.ms = 60000 ##### Topic 配置 ##### ## 是否容许自动建立 topic ## 若为 false,则只能经过命令建立 topic ## 默认 true auto.create.topics.enable = true ## 每一个 topic 的默认分区 partition 个数,在 topic 建立的时候能够指定,若是不指定就使用该参数 ## partition 数量直接影响了可以容纳的 cosumer 数量 num.partitions = 1 ## topic partition 的副本数,副本越多,越不容易由于个别 broker 的问题而丢失数据 ## 副本越多,可用性越高,可是每次数据写入以后同步花费的时间更多 offsets.topic.replication.factor = 1 transaction.state.log.replication.factor = 1 transaction.state.log.min.isr = 1 ##### Zookeeper 配置 ##### ## Zookeeper 地址,用逗号分割能够指定多个 # zookeeper.connect = localhost:2181,localhost:2182 zookeeper.connect = localhost:2101 ## Zookeeper 集群的超时时间 zookeeper.connection.timeout.ms = 6000
上述配置中设置了自动建立 topic,可是也能够手工建立 :apache
./bin/kafka-topics.sh --create \ --zookeeper localhost:2101 \ --replication-factor 1 \ --partitions 1 \ --topic test1
查看 topic :bootstrap
./bin/kafka-topics.sh --zookeeper localhost:2101 --list
./bin/kafka-server-start.sh -daemon ./config/server.properties &
spring boot 版本 :缓存
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> </parent>
引入 jar 包 :bash
<!-- kafka 必须的包 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.4.1.RELEASE</version> <exclusions> <!-- 若是已经在别处引入 spring-boot-starter,此处能够排除 spring 相关的包 --> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-*</artifactId> </exclusion> </exclusions> </dependency>
spring: kafka: ## 生产者配置,若是本实例只是消费者,能够不配置该部分 producer: # client id,随意配置,不可重复 client-id: boot-producer # kafka 服务地址,pi + 端口 bootstrap-servers: aliyun-ecs:9092 # 用于序列化的工具类 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息发送失败状况下的重试次数 retries: 1 # 批量上传的 buffer size,能够是消息数量,也能够是内存量 batch-size: 10000 buffer-memory: 300000 # 等待副本同步以后才确认消息发送成功,可选的值有 0,1,-1,all 等 # 设置为 0 的意思是不等待任何副本同步完成就直接返回 # 设置为 1 的意思是只等待 leader 同步完成 # all 的意思是所有同步完才确认,可是速度会比较慢 acks: 1 ## 消费者配置,若是本实例只是生产者,能够不配置该部分 consumer: # client id,随意配置,不可重复 client-id: boot-consumer # 消费者分组 id,同一组别的不一样消费者共同消费一份数据 group-id: consumer-group-1 # kafka 服务地址,pi + 端口 bootstrap-servers: aliyun-ecs:9092 # 用于反序列化的工具类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 自动更新 offset enable-auto-commit: true # 若是 enable-auto-commit 设置为 true,则每隔一段时间提交一次 offset # 时间单位为毫秒,默认值 5000 (5s) auto-commit-interval: 1000 # offset 消费指针 # earliest 表明从头开始消费,lastest 表明重新产生的部分开始消费 auto-offset-reset: earliest
生产者配套代码:网络
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * kafka mq 生产者包装类 */ @Component public class MQProductor { @Resource KafkaTemplate<String,String> kt; /** * 发送消息的方法 * @param topic 建立的 topic * @param partition topic 分片编号,从 0 开始 * @param key 消息 key,主要用来分片和做为压缩凭据,能够重复,能够为空 * @param message 消息主体 */ public void send(String topic,Integer partition,String key,String message) { kt.send(topic,partition,key,message); } public void send(String message) { send("test-topic",0,"",message); } }
消费者配套代码:socket
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka mq 消费者包装类 */ @Component public class MQListener { /** * 监听方法,能够一次性监听多个 topic * @param cr kafka 返回的消息包装类 */ @KafkaListener(topics = {"test-topic" /*,"test-topic-2"*/ }) public void consume(ConsumerRecord<String,String> cr) { // value String value = cr.value(); System.out.println(value); // key String key = cr.key(); System.out.println(key); // 读取指针 long offset = cr.offset(); System.out.println(offset); // 读取的分区编号 int partition = cr.partition(); System.out.println(partition); // topic String topic = cr.topic(); System.out.println(topic); } }