Apache Kafka 是一个分布式流处理平台,用于构建实时的数据管道和流式的应用.它可让你发布和订阅流式的记录,能够储存流式的记录,而且有较好的容错性,能够在流式记录产生时就进行处理。html
Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 Kafka 的定义:一个分布式发布-订阅消息传递系统。java
1,安装配置Kafka ,Zookeepergit
安装和配置过程很简单,就不详细说了,参考官网:http://kafka.apache.org/quickstartgithub
使用命令启动Kafka: bin``/kafka-server-start``.sh config``/server``.properties
web
下面给出个人环境:spring
Centos 7.5, Kafka 2.11, Zookeeper-3.4.13, JDK1.8+
2,建立 Spring Boot 项目apache
注意版本:该项目使用Spring Boot 2.0 +,低版本可能不对json
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
KafkaSender<T>
T 就是你须要发送的消息 对象,序列化使用阿里的 fastjson消息发送后,能够在回调类里面处理本身的业务,ListenableFutureCallback
类有两个方法,分别是 onFailureon
和 onSuccess
,实际场景能够在这两个方法,处理本身的具体业务,这里不作实现。bootstrap
/** * 消息生产者 * * @author Jarvis * @date 2018/8/3 */ @Component public class KafkaSender<T> { private Logger logger = LoggerFactory.getLogger(KafkaSender.class); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * kafka 发送消息 * * @param obj 消息对象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); logger.info("------------ message = {}", jsonObj); //发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { logger.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { //TODO 业务处理 logger.info("Produce: The message was sent successfully:"); logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); } }); } }
@KafkaListener
注解监听 topics 消息,此处的topics
必须和 send 函数中的 一致@Header(KafkaHeaders.RECEIVED_TOPI
直接获取 topic缓存
/** * 监听kafka.tut 的 topic * * @param record * @param topic topic */ @KafkaListener(id = "tut", topics = "kafka.tut") public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判断是否NULL Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //获取消息 Object message = kafkaMessage.get(); logger.info("Receive: +++++++++++++++ Topic:" + topic); logger.info("Receive: +++++++++++++++ Record:" + record); logger.info("Receive: +++++++++++++++ Message:" + message); } }
spring: application: name: kafka-tutorial kafka: # 指定kafka 代理地址,能够多个 bootstrap-servers: 192.168.10.100:9092 producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 # 缓存容量 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 指定默认消费者group id group-id: consumer-tutorial auto-commit-interval: 100 auto-offset-reset: earliest enable-auto-commit: true # 指定消息key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的线程数,用于提升并发量 listener: concurrency: 3
@Autowired private KafkaSender<User> kafkaSender; @Test public void kafkaSend() throws InterruptedException { //模拟发消息 for (int i = 0; i < 5; i++) { User user = new User(); user.setId(System.currentTimeMillis()); user.setMsg(UUID.randomUUID().toString()); user.setSendTime(new Date()); kafkaSender.send(message); Thread.sleep(3000); } }
控制台能够看到执行成功:
在服务器执行 bin/kafka-topics.sh --list --zookeeper localhost:2181
能够看到topic
1.生产者数据的不丢失
源码 github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial
参考: