springboot集成kafka

1.启动zookeeper
解压,首先启动zookeeper,修改zookeeper/conf下面的zoo_sample.cfg, 修改里面的dataDir=/opt/zkdata(举例),将这个文件更名为zoo.cfg或者复制出一个名为zoo.cfg的文件。
启动zookeeper
 ./bin/zkServer.sh startspring

2.启动kafkaapache

修改kafka/config 的server.properties文件,修改
log.dirs=/opt/kadata 
advertised.listeners=PLAINTEXT://192.168.113.136:9092(这一步很重要,暴露出地址外网能访问,主机才能访问虚拟机)
启动kafka
./bin/kafka-server-start.sh -daemon config/server.properties bootstrap

必定要加上 -daemon,加上意味着程序在后台运行springboot

3.启动kafka建立topicapp

    建立一个名为test 的topicthis

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testspa

4.添加spring依赖server

<dependency>get

    <groupId>org.apache.kafka</groupId>kafka

    <artifactId>kafka_2.12</artifactId>

    <version>1.0.0</version>

</dependency>

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

    <version>2.1.9.RELEASE</version>

</dependency>

5.修改springboot配置文件

 在application.properties添加一些配置

spring.kafka.bootstrap-servers=192.168.113.136:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.bootstrap-servers=192.168.113.136:9092
spring.kafka.consumer.auto-offset-reset=earliest

6.编写一个ProducerController
@RestController
@RequestMapping("/kafka")
public class ProducerController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String sendKafka(@RequestParam("message") String message) {
        try {
            logger.info("kafka的消息={}", message);
            kafkaTemplate.send("test", "key", message);
            logger.info("发送kafka成功.");
            return "successs";
        } catch (Exception e) {
            logger.error("发送kafka失败", e);
            return "failure";
        }
    }
}

7.


/**
 * @Auth justinniu
 * @Date 2018/9/3
 * @Desc
 */
@Component
public class TestConsumer {
    @KafkaListener(topics = "test")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %s, value = %s \n", record.topic(), record.key(), record.value());
    }

}

相关文章
相关标签/搜索