kafka消息堆积能力比较强,能够堆积上亿的消息,特别适合日志处理这种实时性要求不过高的场景,同时支持集群部署,相比redis堆积能力和可靠性更高php
完整项目代码已上传github:github.com/neatlife/my…html
能够经过下面的步骤快速上手这个kafkajava
能够使用docker一键启动一个kafka集群,参考:github.com/simplesteph…git
git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
复制代码
操做效果以下 github
使用命令docker-compose -f full-stack.yml ps
获取能够kafka监听的端口web
记下kafka监听的地址9092,这个后面会用到redis
8000端口是这个kafka的topic的ui界面,这个界面能够查看当前的topic列表,效果以下 spring
这里也看到topic里保存的数据docker
能够在https://start.spring.io/建立测试项目shell
在appliation.properties
中配置kafka的地址和使用的group-id,这个group-id名称能够自行定义,好比:myconsumergroup
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup
复制代码
使用一个spring boot的service封装kafka发送消息的代码,核心代码以下
package mykafka.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private final KafkaTemplate<String, String> kafkaTemplate;
private String topic = "自行定义的topic";
Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String message) {
this.kafkaTemplate.send(topic, message);
System.out.println("Sent sample message [" + message + "] to " + topic);
}
}
复制代码
而后编写一个接口调用这个发送kafka消息的service,核心代码以下:
@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {
private final Producer producer;
@RequestMapping("/test1")
public String test1() {
producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
return "test1";
}
}
复制代码
注意:上面代码里使用的kafka的topic能够自行定义,好比mytopic
而后在浏览器中访问这个接口 ip:8080/test1
能够看到这个消息已经发送到kafka了
消费消息只须要在方法上加上KafkaListener,并指定topic和groupId便可
核心代码以下
@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
log.info(
"received message, topic: {}, partition: {}, offset: {}, message: {}",
topics.get(0),
partitions.get(0),
offsets.get(0),
message
);
}
复制代码
操做效果以下:
php发送和消费客户端参考:github.com/arnaud-lb/p…
go客户端参考:github.com/confluentin…
发送消息和消费消息须要确保topic一致
日志能够先发送到kafka作缓冲,而后经过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化
由于kafka客户端发送消息和服务端把消息保存到磁盘都是异步操做,因此存在服务器宕机后消息可能丢失,若是可靠性要求更高,能够使用改进版的kafka:rocketmq