1,首先springboot对kafka的支持也很好,一样是在配置文件中配置好参数,而后就能够直接使用。先说一下,很简单,,,不要怕java
2,我用的依赖是spring
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件apache
kafka: bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 producer: retries: 1 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 consumer: bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 enable-auto-commit: true auto-offset-reset: latest auto-commit-interval: 1000 group-id: gzj
而后在须要往kafka发送数据的地方,也就是生产者,直接注入便可bootstrap
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
消费者,监听springboot
@KafkaListener(topics = {"gzj"}) public void receive(String content){ System.err.println("Receive:" + content); }
消费者还有另外一种方法,app
package com.gzj.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Component public class KafkaConsumerTask implements Runnable,InitializingBean { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class); private Thread thread; @Resource(name="kafkaConsumer") private KafkaConsumer<String,String> kafkaConsumer; @Override public void run() { logger.info("消费数据任务启动"); while(true){ try{ ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000); if(records!=null){ for(ConsumerRecord<String ,String > record:records){ logger.error(record.key()); logger.error(record.topic()); logger.error(record.value()); } } }catch(Exception e){ // logger.error("我也不知道哪儿错了"); }finally { // logger.error("不放弃"); } } } @Override public void afterPropertiesSet() throws Exception { this.thread=new Thread(this); this.thread.start(); } }
package com.gzj.demo.config; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Properties; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Configuration @ConfigurationProperties(prefix = "spring.kafka") public class KafkaConnectConfig { @Bean(name = "kafkaConsumer") public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "ggg"); props.setProperty("enable.auto.commit", enableAutoCommit); props.setProperty("auto.offset.reset", autoOffsetReset); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("gzj")); return consumer; } @Value("${server.port}") private String port; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getEnableAutoCommit() { return enableAutoCommit; } public void setEnableAutoCommit(String enableAutoCommit) { this.enableAutoCommit = enableAutoCommit; } public String getAutoOffsetReset() { return autoOffsetReset; } public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; } }
后一种暂未发现有什么优势。均可以实现监听kafka,充当消费者ide
3,如今我有两个消费者,以前一直好奇若是多个消费者,如何让他们重复消费,或协同消费,据说是经过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。this
也没什么,挺简单的,有什么问题能够提问,开源中国的app我下好了,应该常常登陆server