Spring Cloud Stream是什么:html
Spring Cloud Stream是Spring Cloud的一个子项目,是一个能让咱们更加方便操做MQ的框架,其目的用于构建与消息中间件链接的高度可伸缩的消息事件驱动的微服务java
简单来讲Spring Cloud Stream就是一个简化了MQ操做的框架,其架构图以下:node
Spring Cloud Stream编程模型:web
关于图中的概念:spring
如今有一个微服务项目:content-center,该微服务做为生产者,咱们来为这个微服务集成Spring Cloud Stream,第一步添加stream依赖:apache
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
第二步,在启动类上添加@EnableBinding
注解,以下:编程
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) ...
第三步,在配置文件中,添加与stream相关的配置项:json
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 生产者为output output: # 用于指定topic destination: stream-test-topic
完成以上步骤后,项目就已经集成了Spring Cloud Stream,如今咱们来使用Spring Cloud Stream编写生产者,具体代码以下:架构
package com.zj.node.contentcenter.controller.content; import lombok.RequiredArgsConstructor; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * 生产者 * * @author 01 * @date 2019-08-10 **/ @RestController @RequiredArgsConstructor public class TestProducerController { private final Source source; @GetMapping("/test-stream") public String testStream(){ Message<String> message = MessageBuilder .withPayload("消息体") .build(); source.output() .send(message); return "send message success!"; } }
启动项目,测试该接口是否能成功执行:app
而后为另外一个做为消费者的微服务项目:user-center,集成Spring Cloud Stream,因为依赖配置是同样的,这里就不进行重复了,可是配置和注解里的类须要更改一下。首先是配置以下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 消费者为input input: # 用于指定topic destination: stream-test-topic # rocketmq必须配置group,不然启动会报错 # 若是使用的是其余MQ,则不是必须配置的 group: binder-group
启动类的注解以下:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) ...
完成集成后,使用Spring Cloud Stream编写消费者,具体代码以下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; /** * 消费者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(Sink.INPUT) public void receive(String messageBody) { log.info("经过stream收到了消息,messageBody = {}", messageBody); } }
完成代码的编写后启动项目,因为先前咱们已经经过生产者往RocketMQ投递了消息,因此此时控制台会输出接收到的消息,以下:
经过以上小节的学习,咱们已经了解了Spring Cloud Stream的基本使用。从以上示例能够得知,input用于绑定一个topic消费消息,output则反之,用于绑定一个topic投递消息。
但在实际的项目中,可能会有多个topic,甚至在极端场景下,不一样的topic可能使用不一样的MQ实现,而stream默认提供的input和output都只能绑定一个topic,因此这个时候就须要用到stream的自定义接口来实现多个“input”和“output”绑定不一样的topic了。
在以上小节的示例中能够得知,生产者发送消息时使用的是Source
接口里的output
方法,而消费者发送消息时使用的是Sink
接口里的input
方法,而且都须要配置到启动类的@EnableBinding
注解里。因此实际上咱们须要自定义接口的源码与这两个接口的源码几乎一致,只是名称有所不一样而已,使用上也只是将Source
和Sink
改成自定义的接口便可。
接下来简单演示一下如何自定义接口并使用,咱们基于上一小节的例子进行改造。首先是生产者,定义一个用于发送消息的接口,具体代码以下:
package com.zj.node.contentcenter.rocketmq; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义发送消息接口,与stream默认提供的Source源码是相似的 * * @author 01 * @date 2019-08-10 **/ public interface MySource { /** * Name of the output channel. */ String MY_OUTPUT = "my-output"; /** * @return output channel */ @Output(MY_OUTPUT) MessageChannel output(); }
而后在启动类的@EnableBinding
中,添加这个接口:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding({Source.class, MySource.class}) ...
在配置文件中添加以下配置:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 生产者为output output: # 用于指定topic destination: stream-test-topic # 自定义的”output“,这里的名称须要与MySource接口里的MY_OUTPUT相对应 my-output: # 绑定不一样的topic destination: stream-my-topic
修改生产者的代码以下便可:
package com.zj.node.contentcenter.controller.content; import com.zj.node.contentcenter.rocketmq.MySource; import lombok.RequiredArgsConstructor; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * 生产者 * * @author 01 * @date 2019-08-03 **/ @RestController @RequiredArgsConstructor public class TestProducerController { private final MySource mySource; @GetMapping("/test-stream") public String testStream(){ Message<String> message = MessageBuilder .withPayload("消息体") .build(); mySource.output() .send(message); return "send message success!"; } }
而后启动项目访问该接口,测试消息是否能正常发送:
改造完生产者后接着改造消费者,首先定义一个用于消费消息的接口,具体代码以下:
package com.zj.node.usercenter.rocketmq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消费消息接口,与stream默认提供的Sink源码是相似的 * * @author 01 * @date 2019-08-10 **/ public interface MySink { /** * Input channel name. */ String MY_INPUT = "my-input"; /** * @return input channel. */ @Input(MY_INPUT) SubscribableChannel input(); }
一样须要在启动类的@EnableBinding
中,添加这个接口:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding({Sink.class, MySink.class}) ...
在配置文件中添加以下配置:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 消费者为input input: # 用于指定topic destination: stream-test-topic # rocketmq必须配置group,不然启动会报错 # 若是使用的是其余MQ,则不是必须配置的 group: binder-group # 自定义的”input“,这里的名称须要与MySink接口里的MY_INPUT相对应 my-input: # 绑定不一样的topic destination: stream-my-topic group: my-group
修改消费者的代码以下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Service; /** * 消费者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(MySink.MY_INPUT) public void receive(String messageBody) { log.info("自定义接口 - 经过stream收到了消息,messageBody = {}", messageBody); } }
启动项目,因为先前咱们已经经过生产者往RocketMQ投递了消息,因此此时控制台会输出接收到的消息,以下:
咱们都知道Spring Boot Actuator组件用于暴露监控端点,不少监控工具都须要依赖该组件的监控端点实现监控。而项目集成了Stream及Actuator后也会暴露相应的监控端点,首先须要在项目里集成Actuator,添加依赖以下:
<!-- actuator --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
在配置文件中添加以下配置:
management: endpoints: web: exposure: # 暴露全部监控端点 include: '*' endpoint: health: # 显示健康检测详情 show-details: always
访问http://127.0.0.1:{项目端口}/actuator
能够获取全部暴露出来的监控端点,Stream的相关监控端点也在其列,以下图:
/actuator/bindings
端点能够用于查看bindings相关信息:
/actuator/channels
端点用于查看channels的相关信息,而“input”和“output”就是所谓的channel,能够认为这些channel是topic的抽象:
在/actuator/health
端点中能够查看binder及RocketMQ的状态,主要是用于查看MQ的链接状况,若是链接不上其status则为DOWN:
先前在Spring Cloud Alibaba RocketMQ - 构建异步通讯的微服务一文的末尾中,咱们介绍了RocketMQ的事务消息而且也演示了如何编码实现。在本文学习了Spring Cloud Stream以后,咱们来结合Stream对以前实现事务消息的代码进行重构。
首先修改配置文件以下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: output: producer: # 开启事务消息,这样经过output这个channel发送的消息都是半消息 transactional: true # 生产者所在的事务组名称 group: tx-test-producer-group bindings: # 生产者为output output: # 用于指定topic destination: stream-test-topic
而后重构TestProducerService
,具体代码以下:
package com.zj.node.contentcenter.service.test; import com.alibaba.fastjson.JSON; import com.zj.node.contentcenter.dao.content.NoticeMapper; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.UUID; /** * @author 01 * @date 2019-08-08 **/ @Service @RequiredArgsConstructor public class TestProducerService { private final NoticeMapper noticeMapper; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; private final Source source; public String testSendMsg(Notice notice) { // 生成事务id String transactionId = UUID.randomUUID().toString(); // 经过stream发送消息,这里实际发送的就是半消息 source.output().send( MessageBuilder.withPayload("消息体") // header是消息的头部分,能够用做传参 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("notice_id", notice.getId()) // 对象须要转换成json,不然默认是调用对象的toString方法转换为字符串 .setHeader("notice", JSON.toJSONString(notice)) .build() ); return "send message success"; } @Transactional(rollbackFor = Exception.class) public void updateNotice(Integer noticeId, Notice notice) { Notice newNotice = new Notice(); newNotice.setId(noticeId); newNotice.setContent(notice.getContent()); noticeMapper.updateByPrimaryKeySelective(newNotice); } @Transactional(rollbackFor = Exception.class) public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) { updateNotice(noticeId, notice); // 写入事务日志 rocketmqTransactionLogMapper.insertSelective( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("updateNotice") .build() ); } }
最后是重构TestTransactionListener
,具体代码以下:
package com.zj.node.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import com.zj.node.contentcenter.service.test.TestProducerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * 本地事务监听器 * * @author 01 * @date 2019-08-08 **/ @Slf4j @RequiredArgsConstructor // 这里的txProducerGroup须要与配置文件里配置的一致 @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group") public class TestTransactionListener implements RocketMQLocalTransactionListener { private final TestProducerService service; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; /** * 用于执行本地事务的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.info("执行本地事务方法. 事务id: {}", transactionId); Integer noticeId = Integer.parseInt((String) headers.get("notice_id")); // 因为从header里获取的对象是json格式因此须要进行转换 Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class); try { // 执行带有事务注解的方法 service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId); // 正常执行向MQ Server发送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务方法发生异常,消息将被回滚", e); // 发生异常向MQ Server发送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用于回查本地事务的执行结果 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.warn("回查本地事务状态. 事务id: {}", transactionId); // 按事务id查询日志数据 RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); // 若是能按事务id查询出来数据表示本地事务执行成功,没有数据则表示本地事务执行失败 if (transactionLog == null) { log.warn("本地事务执行失败,事务日志不存在,消息将被回滚. 事务id: {}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } return RocketMQLocalTransactionState.COMMIT; } }
扩展文章: