一、配置application.ymljava
spring: kafka: listener: ack-mode: manual bootstrap-servers: localhost:9092 #kafka的ip+post(若是是集群,用逗号分隔) producer: retries: 0 #当大于0时,会启用重试机制 batch-size: 16384 #发送前批处理的记录数 buffer-memory: 33554432 #生产者能够缓存的总字节数(待发送到服务器的数据) key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化 consumer: group-id: foo #消费者组(惟一) auto-offset-reset: earliest #当kafka上不存在偏移量或没有初始偏移量时,须要作什么 enable-auto-commit: false #若是为true,则在后台定时提交消费者偏移量 auto-commit-interval: 100 #若是enable-auto-commit=true,则每毫秒提交100偏移量到kafka key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的序列化 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的序列化
二、配置Application.javaspring
@SpringBootApplication @EnableKafka public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
三、配置消费者apache
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "myTopic") public void listener(ConsumerRecord<?, ?> cr) throws Exception { String key = (String) cr.key(); logger.info("key:{}", key); String value = (String) cr.value(); logger.info("value:{}", value); } @KafkaListener(topics = "myTopic2") public void listen(String data, Acknowledgment ack) { logger.info("data:{}", data); ack.acknowledge(); } }
四、配置生产者bootstrap
@RunWith(value = SpringJUnit4ClassRunner.class) @SpringBootTest(classes = Application.class) public class ProducerTest { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, String> template; @Test public void send() { for (int i = 0; i < 100; i++) { this.template.send("myTopic2", "foo" + i); } logger.info("All received"); } }