本文简单介绍下spring-cloud-stream-binder-kafka的一些属性配置。html
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> <version>1.0.3.RELEASE</version> </dependency>
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/ChannelBindingServiceProperties.javajava
spring: cloud: stream: instanceIndex: 0 ##支持环境变量INSTANCE_INDEX ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka. ## used to partition data across different consumers.
Topic在逻辑上能够被认为是一个queue。每条消费都必须指定它的topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为了使得 Kafka的吞吐率能够水平扩展,物理上把topic分红一个或多个partition,每一个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的全部消息和索引文件。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。spring
同一个partition内的消息只能被同一个组中的一个consumer消费。docker
当消费者数量多于partition的数量时,多余的消费者空闲。api
消费者少于和等于partition的数量时,会出现多个partition对应一个消费者的状况,个别消费者消费量会比其余的多。app
instanceCount主要是consumer用的,通常小于或等于topic的partition数量,主要用做消费者的消费分区用。框架
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/BindingProperties.javamaven
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain #group: test ##consumer属性 #producer: #consumer:
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ProducerProperties.java微服务
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1 headerMode partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass headerMode: raw
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: bufferSize: 16384 maxRequestSize: 1048576 sync: true batchTimeout: 0
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ConsumerProperties.java性能
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: concurrency: 1 ## The concurrency of the inbound consumer. partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false. headerMode: raw
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: autoCommitOffset: false resetOffsets: true startOffset: earliest enableDlq: false recoveryInterval: 5000
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap);
这里头topicCountMap告诉Kafka咱们在Consumer中将用多少个线程来消费该topic。topicCountMap的key是topic name,value针对该topic是线程的数量。
总体的话,spring cloud stream本身抽象了一部分,可是有个硬伤就是spring.cloud.stream.instanceIndex这个不大友好,这样就形成服务的实例是有状态的了,在基于docker部署起来比较麻烦,还不如直接原生api。若是partition很少,或者每一个consumer性能强悍的话,那么至少部署两个,配置起来也还能够接受。