spring kafka开发

一、配置application.ymljava

spring:
  kafka:
    listener:
      ack-mode: manual
    bootstrap-servers: localhost:9092 #kafka的ip+post(若是是集群,用逗号分隔)
    producer:
      retries: 0 #当大于0时,会启用重试机制
      batch-size: 16384 #发送前批处理的记录数
      buffer-memory: 33554432 #生产者能够缓存的总字节数(待发送到服务器的数据)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化
    consumer:
      group-id: foo #消费者组(惟一)
      auto-offset-reset: earliest #当kafka上不存在偏移量或没有初始偏移量时,须要作什么
      enable-auto-commit: false #若是为true,则在后台定时提交消费者偏移量
      auto-commit-interval: 100 #若是enable-auto-commit=true,则每毫秒提交100偏移量到kafka
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的序列化

二、配置Application.javaspring

@SpringBootApplication
@EnableKafka
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

三、配置消费者apache

@Component
public class Consumer {
	private Logger logger = LoggerFactory.getLogger(getClass());

	@KafkaListener(topics = "myTopic")
	public void listener(ConsumerRecord<?, ?> cr) throws Exception {
		String key = (String) cr.key();
		logger.info("key:{}", key);
		String value = (String) cr.value();
		logger.info("value:{}", value);
	}

	@KafkaListener(topics = "myTopic2")
	public void listen(String data, Acknowledgment ack) {
		logger.info("data:{}", data);
		ack.acknowledge();
	}
}

四、配置生产者bootstrap

@RunWith(value = SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class ProducerTest {
	private Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	private KafkaTemplate<String, String> template;

	@Test
	public void send() {
		for (int i = 0; i < 100; i++) {
			this.template.send("myTopic2", "foo" + i);
		}
		logger.info("All received");
	}
}
相关文章
相关标签/搜索