Kafka是一种高吞吐量的分布式发布订阅消息系统,有以下特性: 经过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即便数以TB的消息存储也可以保持长时间的稳定性能。高吞吐量:即便是很是普通的硬件Kafka也能够支持每秒数百万的消息。支持经过Kafka服务器和消费机集群来分区消息。支持Hadoop并行数据加载。java
Springboot的基本搭建和配置我在以前的文章已经给出代码示例了,若是还不了解的话能够先按照 SpringMVC配置太多?试试SpringBoot 进行学习哦。 那么现在很火的Springboot与kafka怎么完美的结合呢?多说无宜,放码过来 (talk is cheap,show me your code)!git
由于安装kafka须要zookeeper的支持,因此Windows安装时须要将zookeeper先安装上,而后将kafka安装好就能够了。 下面我给出Mac安装的步骤以及须要注意的点吧,windows的配置除了所在位置不太同样其余几乎没什么不一样。github
brew install kafka
web
对,就是这么简单,mac上一个命令就能够搞定了,这个安装过程可能须要等一下子,应该是和网络情况有关系。安装提示信息可能有错误消息,如"Error: Could not link: /usr/local/share/doc/homebrew" 这个不要紧,自动忽略掉了。 最终咱们看到下面的样子就成功咯。spring
==> Summary ðº/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB
apache
安装的配置文件位置以下,根据本身的须要修改端口号什么的就能够了。bootstrap
安装的zoopeeper和kafka的位置 /usr/local/Cellar/
windows
配置文件 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
浏览器
启动zookeeper缓存
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
启动kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
为kafka建立Topic,topic 名为test,能够配置成本身想要的名字,回头再代码中配置正确就能够了。
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>RELEASE</version> </dependency> </dependencies>
server: servlet: context-path: / port: 8080 spring: kafka: bootstrap-servers: 127.0.0.1:9092 #生产者的配置,大部分咱们可使用默认的,这里列出几个比较重要的属性 producer: #每批次发送消息的数量 batch-size: 16 #设置大于0的值将使客户端从新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不一样。容许重试将潜在的改变数据的顺序,若是这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 retries: 0 #producer能够用来缓存数据的内存大小。若是数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来代表。这项设置将和producer可以使用的总内存相关,但并非一个硬性的限制,由于不是producer使用的全部内存都是用于缓存。一些额外的内存会用于压缩(若是引入压缩机制),一样还有一些用于维护请求。 buffer-memory: 33554432 #key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #消费者的配置 consumer: #Kafka中没有初始偏移或若是当前偏移在服务器上再也不存在时,默认区最新 ,有三个选项 【latest, earliest, none】 auto-offset-reset: latest #是否开启自动提交 enable-auto-commit: true #自动提交的时间间隔 auto-commit-interval: 100 #key的解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的解码方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #在/usr/local/etc/kafka/consumer.properties中有配置 group-id: test-consumer-group
@Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; private static Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId("KFK_"+System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); kafkaTemplate.send("test", gson.toJson(message)); } }
public class Message { private String id; private String msg; private Date sendTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; } }
public class Consumer { @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record){ Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("---->"+record); System.out.println("---->"+message); } } }
这里咱们用一个接口来测试咱们的消息发送会不会被消费者接收。
@RestController @RequestMapping("/kafka") public class SendController { @Autowired private Producer producer; @RequestMapping(value = "/send") public String send() { producer.send(); return "{\"code\":0}"; } }
在Springboot启动类启动后在浏览器访问http://127.0.0.1:8080/kafka/send,咱们能够再IDE控制台中看到输出的结果,这时候咱们的整合基本上就完成啦。 具体代码能够在SpringBootKafkaDemo@github获取哦。