Spring Boot 集成 Message

32.Messaging

32.1.JMS(TODO)

 

32.2.amqp(TODO)

 

32.3.Apache Kafka Support (集成kafka)

    32.3.1 Sending a Message

    Spring的KafkaTemplate是自动配置的,你能够直接在你的bean里自动装配,以下示例:html

/**
 * description:kafka 配置文件
 * @author manji
 * @Date 2018年08月21日14:04:45
 */
@Component
public class KafkaConf {
    @Autowired
    private  KafkaTemplate kafkaTemplate;
    /**
     * product message
     * @param topic : topic name
     * @param msg : topic msg
     */
    public void product(String topic, String msg){
        kafkaTemplate.send(topic, msg);
    }
}

 

若是定义了RecordMessageConverter bean,它会自动关联到自动配置的KafkaTemplate.java

    32.3.2 Receiving a Message

当Apache Kafka 相关引用存在,能够用@KafkaListener注解在任何bean上面建立监听端点. 若是不手动定义KafkaListenerContainerFactory的话,默认值经过spring.kafka.listener.* 键进行自动配置. 固然,若是自定义了一个RecordMessageConverter bean,它会自动关联到默认工厂。spring

下面Component 在"manjixx_test" topic上建立一个切点:json

/**
 * description:kafka 配置文件
 * @author manji
 * @Date 2018年08月21日14:04:45
 */
@Component
@Slf4j
public class KafkaConf {
    @Autowired
    private  KafkaTemplate kafkaTemplate;

    @KafkaListener(topics = {"manjixx_test"})
    public String consumer(String message){
        log.info("test topic message : {}" , message    );
        return message;
    }
}

    32.3.3 Additional Kafka Properties

支持自动配置的属性显示在 Appendix A, Common application properties. 注意, 大多数状况下, 这些属性直接映射Apache Kafka 点状属性. 有关详细信息请查看 Apache Kafka文档. app

这些属性中的前几个同时适用于生产者和消费者,可是若是你想生产者和消费者属性使用不一样的值,则能够在生产者和消费者级别指定. Apache Kafka指定属性的重要性经过HIGH, MEDIUM, or LOW. Spring Boot 自动配置支持全部HIGH等级重要性的属性,一些选择的MEDIUM 和LOW属性,以及任何没有默认值的属性. spring-boot

KafkaProperties类只提供Kafka支持的属性的子集. 若是你想配置一些额外的不直接支持的属性在生产者和消费者上,使用如下properties:spa

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth

设置的第一个kafka属性 prop.one 值为one(适用于 生产者,消费者,管理员), prop.two = second 适用于管理员,prop.three = three 消费者属性,prop.four = fourth 适用于生产者.code

你也能够按照以下方式设置kafka 的Json反序列化方式:htm

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

一样的, 你能够禁用JsonSerializer在头文件中发送类信息的行为:three

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

TIPS: 以上面这种方式设置的属性会把Spring Boot 明确支持的配置项覆盖. 

相关文章
相关标签/搜索