Spring的KafkaTemplate是自动配置的,你能够直接在你的bean里自动装配,以下示例:html
/** * description:kafka 配置文件 * @author manji * @Date 2018年08月21日14:04:45 */ @Component public class KafkaConf { @Autowired private KafkaTemplate kafkaTemplate; /** * product message * @param topic : topic name * @param msg : topic msg */ public void product(String topic, String msg){ kafkaTemplate.send(topic, msg); } }
若是定义了RecordMessageConverter bean,它会自动关联到自动配置的KafkaTemplate.java
当Apache Kafka 相关引用存在,能够用@KafkaListener注解在任何bean上面建立监听端点. 若是不手动定义KafkaListenerContainerFactory的话,默认值经过spring.kafka.listener.* 键进行自动配置. 固然,若是自定义了一个RecordMessageConverter bean,它会自动关联到默认工厂。spring
下面Component 在"manjixx_test" topic上建立一个切点:json
/** * description:kafka 配置文件 * @author manji * @Date 2018年08月21日14:04:45 */ @Component @Slf4j public class KafkaConf { @Autowired private KafkaTemplate kafkaTemplate; @KafkaListener(topics = {"manjixx_test"}) public String consumer(String message){ log.info("test topic message : {}" , message ); return message; } }
支持自动配置的属性显示在 Appendix A, Common application properties. 注意, 大多数状况下, 这些属性直接映射Apache Kafka 点状属性. 有关详细信息请查看 Apache Kafka文档. app
这些属性中的前几个同时适用于生产者和消费者,可是若是你想生产者和消费者属性使用不一样的值,则能够在生产者和消费者级别指定. Apache Kafka指定属性的重要性经过HIGH, MEDIUM, or LOW. Spring Boot 自动配置支持全部HIGH等级重要性的属性,一些选择的MEDIUM 和LOW属性,以及任何没有默认值的属性. spring-boot
KafkaProperties类只提供Kafka支持的属性的子集. 若是你想配置一些额外的不直接支持的属性在生产者和消费者上,使用如下properties:spa
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth
设置的第一个kafka属性 prop.one 值为one(适用于 生产者,消费者,管理员), prop.two = second 适用于管理员,prop.three = three 消费者属性,prop.four = fourth 适用于生产者.code
你也能够按照以下方式设置kafka 的Json反序列化方式:htm
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
一样的, 你能够禁用JsonSerializer在头文件中发送类信息的行为:three
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
TIPS: 以上面这种方式设置的属性会把Spring Boot 明确支持的配置项覆盖.