Spring Cloud Stream使用细节

上篇文章咱们看了Spring Cloud Stream的基本使用,小伙伴们对Spring Cloud Stream应该也有了一个基本的了解,可是上篇文章中的消息咱们是从RabbitMQ的web管理页面发来的,若是咱们想要从代码中发送消息呢?本文咱们就来看看Spring Cloud Stream的一些使用细节。web


本文是Spring Cloud系列的第三十篇文章,了解前二十九篇文章内容有助于更好的理解本文: spring

1.使用Spring Cloud搭建服务注册中心
2.使用Spring Cloud搭建高可用服务注册中心
3.Spring Cloud中服务的发现与消费
4.Eureka中的核心概念
5.什么是客户端负载均衡
6.Spring RestTemplate中几种常见的请求方式
7.RestTemplate的逆袭之路,从发送请求到负载均衡
8.Spring Cloud中负载均衡器概览
9.Spring Cloud中的负载均衡策略
10.Spring Cloud中的断路器Hystrix
11.Spring Cloud自定义Hystrix请求命令
12.Spring Cloud中Hystrix的服务降级与异常处理
13.Spring Cloud中Hystrix的请求缓存
14.Spring Cloud中Hystrix的请求合并
15.Spring Cloud中Hystrix仪表盘与Turbine集群监控
16.Spring Cloud中声明式服务调用Feign
17.Spring Cloud中Feign的继承特性
18.Spring Cloud中Feign配置详解
19.Spring Cloud中的API网关服务Zuul
20.Spring Cloud Zuul中路由配置细节
21.Spring Cloud Zuul中异常处理细节
22.分布式配置中心Spring Cloud Config初窥
23.Spring Cloud Config服务端配置细节(一)
24.Spring Cloud Config服务端配置细节(二)之加密解密
25.Spring Cloud Config客户端配置细节
26.Spring Cloud Bus之RabbitMQ初窥
27.Spring Cloud Bus整合RabbitMQ
28.Spring Cloud Bus整合Kafka
29.Spring Cloud Stream初窥缓存


自定义消息通道

上篇文章咱们提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor经过继承Source和Sink,同时具备输入通道和输出通道。这里咱们就模仿Sink和Source,来定义一个本身的消息通道。 负载均衡

仍是在上文的基础上,首先咱们定义一个接口叫作MySink,以下:分布式

public interface MySink {
    String INPUT = "mychannel";

    @Input(INPUT)
    SubscribableChannel input();
}

这里咱们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时咱们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。而后,咱们再定义一个名为MySource的接口,以下:微服务

public interface MySource {
    @Output(MySink.INPUT)
    MessageChannel output();
}

@Output注解中描述了消息通道的名称,仍是mychannel,而后这里咱们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。 单元测试

最后咱们定义一个消息接收类,以下:测试

@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

    @StreamListener(MySink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:" + playload);
    }
}

OK,咱们在这里绑定消息通道,而后监听自定义的消息通道,最后来一个单元测试测试一下,以下:ui

@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {

    @Autowired
    private MySource mySource;

    @Test
    public void contextLoads() {
        mySource.output().send(MessageBuilder.withPayload("hello 123").build());
    }
}

运行单元测试,咱们能够看到以下日志,表示消息发送成功了: 加密

图片描述

若是想要发送对象也能够直接发送,不用进行对象转换,以下:

发送:

Book book = new Book(1l, "三国演义", "罗贯中");
mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

@StreamListener(MySink.INPUT)
public void receive(Book playload) {
    logger.info("Received:" + playload);
}

若是咱们想要在接收成功后给一个回执,也是OK的,以下:

@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定义回执发送的消息通道
public String receive(Book playload) {
    logger.info("Received:" + playload);
    return "receive msg :" + playload;
}

方法的返回值就是回执消息,回执消息在系统默认的output通道中,咱们若是想要接收这个消息,固然就要监听这个通道,以下:

@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
    System.out.println("msg:"+msg);
}

固然要记得Source类也要在@EnableBinding注解中进行绑定。此时运行结果以下:

图片描述

消费组

因为咱们的服务可能会有多个实例同时在运行,若是不作任何设置,此时发送一条消息将会被全部的实例接收到,可是有的时候咱们可能只但愿消息被一个实例所接收,这个需求咱们能够经过消息分组来解决。方式很简单,给项目配置消息组和主题,以下:

spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1

这里咱们设置该工程都属于g1消费组,输入通道的主题名则为dest1。这里配置完成以后,咱们在消息发送方作以下配置:

spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主题名为dest1(若是发送和接收就在同一个应用中,则这里能够不配置)。OK,此时咱们将咱们的项目启动两个实例,注意两个实例的端口不同,此时若是咱们再发送消息,则只会被两个实例中的一个接收到,另一个应用则接收不到,可是究竟是两个实例中的哪个接收,则是不肯定的。

消息分区

有的时候,咱们可能须要相同特征的消息可以老是被发送到同一个消费者上去处理,若是咱们只是单纯的使用消费组则没法实现功能,此时咱们须要借助于消息分区,消息分区以后,具备相同特征的消息就能够老是被同一个消费者处理了,配置方式以下(这里的配置都是在消费组的配置基础上完成的):

在消费者上添加以下配置:

spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

关于这个配置我说三点:

1.第一行表示开启消息分区
2.第二行表示当前消息者的总的实例个数
3.第三行表示当前实例的索引,从0开始,当咱们启动多个实例时,须要在启动时在命令行配置索引

而后在消息生产者上添加以下配置:

spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置设置了分区键的表达式规则,第二行则设置了消息分区数量。

OK,此时咱们再次启动多个消费者实例,而后重复发送多条消息,这些消息都将被同一个消费者处理掉。

Spring Cloud Stream使用细节咱们就先说到这里,有问题欢迎留言讨论。

参考资料:
1.《Spring Cloud微服务实战》

更多JavaEE资料请关注公众号:

图片描述

相关文章
相关标签/搜索