一、group:java
组内只有1个实例消费。若是不设置group,则stream会自动为每一个实例建立匿名且独立的group——因而每一个实例都会消费spring
组内单次只有1个实例消费,而且会轮询负载均衡。一般,在将应用程序绑定到给定目标时,最好始终指定consumer group负载均衡
二、destination binder:ide
与外部消息系统通讯的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序能够灵活地链接到中间件,目前spring为kafka、rabbitmq提供bindercode
三、destination binding:orm
Binding 是链接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder建立中间件
四、partitionblog
一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,而后根据分区个数取余,因此对于相同的消息,总会落到同一个消费者上接口
注:严格来讲partition不属于概念,而是一种Stream提升伸缩性、吞吐量的一种方式rabbitmq
一、@Input,使用示例:
public interface MySink { @Input("my-input") SubscribableChannel input(); }
做用:
二、@Output,使用示例:
public interface MySource { @Output("my-output") MessageChannel output(); }
做用:
@Input
相似,只不过是用来生产消息三、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void receive(String messageBody) { log.info("Received: {}", messageBody); }
做用:
四、@SendTo,使用示例:
// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel @StreamListener(Sink.INPUT) @SendTo(Source.OUTPUT) public String receive(String receiveMsg) { return "handle..."; }
做用:
四、@InboundChannelAdapter,使用示例:
@Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> producer() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); }
做用:
五、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
做用:
六、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); }
做用:
@ServiceActivator
相似,标注该注解的方法可以转换消息,消息头,或消息有效内容PollableMessageSource容许消费者能够控制消费速率。举个例子简单演示一下,首先定义一个接口:
public interface PolledProcessor { @Input("pollable-input") PollableMessageSource input(); }
使用示例:
@Autowired private PolledProcessor polledProcessor; @Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }
参考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers