上节学习了基础的RabbitMQ,本次继续学习MQ相关API---SpringCloud Stream前端
看到Stream大体会联想到流,input,output等信息,官网解释SpringCloud Stream是给微服务应用构建消息队列驱动的能力的框架java
服务利用input,output与SpringCloud Stream中Binger交互,Binger与中间件交互,Binger是服务与消息中间件的桥梁,SpringCloud Stream对消息中间件的近一步封装,可动态切换中间件,目前给出的只有支持RabbitMQ、kafkamysql
继续上篇在order服务中测试web
第一步,走套路,maven引入依赖redis
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步用原来的,yml配置+SpringCloud Config配置中心配置spring
spring: application: name: order cloud: config: discovery: enabled: true service-id: CONFIG profile: dev eureka: client: service-url: defaultZone: http://localhost:8761/eureka/
码云中的SpringCloud Config配置中心配置sql
server: port: 8083 spring: application: name: order datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/beginner?useSSL=false&characterEncoding=utf-8 username: root password: 123456 jpa: show-sql: true rabbitmq: host: 192.168.99.100 port: 5672 username: guest password: guest virtual-host: / PRODUCT: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule hello: jerry
第三步,建立一个接口StreamClient数据库
public interface StreamClient { String MSG="myFirstStream"; @Input(StreamClient.MSG) SubscribableChannel input(); @Output(StreamClient.MSG) MessageChannel output(); }
第四步,开启Binging以及监听上面接口,用于接收消息json
@Component @EnableBinding(StreamClient.class) @Slf4j public class StreamListen { @StreamListener(StreamClient.MSG) public void stream(Object msg){ log.info("【SpringCloud Stream】StreamListen.stream={}",msg); } }
第五步,测试,写在Controller中,用于推送消息后端
import com.cloud.order.MQmsg.StreamClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RefreshScope public class StreamController { @Autowired private StreamClient streamClient; @GetMapping("/stream") public void setStreamClient() { streamClient.output().send(MessageBuilder.withPayload("hello,stream").build()); } }
在微服务中重要的服务确定会多部署几个,实现高可用,负载均衡,当启动两个order服务的时候,发送请求消息的时候,几个实例都有接收到,在yml配置stream的分组来实现只给一个实例接收消息,以下:
spring: application: name: order cloud: config: discovery: enabled: true service-id: CONFIG profile: dev #new 新加部分 stream: bindings: myFirstStream: group: orderStream content: application/json eureka: client: service-url: defaultZone: http://localhost:8761/eureka/
group是服务分组 , content是将对象以json方式显示在RabbitMQ管理控制台,方便查看
正常状况下都会传对象,RabbitMQ管理控制台会打印base64加密后的数据,看不出结构,所以使用上面的content的json方式
还可使用如下注解,指定发送方
@StreamListener(StreamClient.MSG) @SendTo("otherStream") public void stream(Object msg){ log.info("【SpringCloud Stream】StreamListen.stream={}",msg); }
有注意的点是如今有order服务,product服务,在微服务中天然的库都是分开存储的,事务就有疑问存在
在单体应用中,保持数据的一致性很容易实现,切换成分布式容易出现数据不一致的状况,数据库回滚是自动完成的,就有消费方出异常,消息多发的累积的状况,就须要从新考虑设计
下面给一个order与product下订单操做的简单流程,假设一个用户购买N个商品
①查询商品信息->调用product服务
②获取商品信息,计算价格,生成商品对于的订单详情->调用order服务
③扣减用户购买商品的库存数量->调用product服务
④生成总订单->调用order服务
如今中间使用消息队列,有如下方式 ↓↓↓
假设④放入MQ变成异步处理,①②③依旧,在高并发状况下,会提升效率,由于②④都是操做order服务的数据库的,④转为不等待处理,①②③执行完就回复前端订单完成,后面经过消息通知④异步完成,若是④失败了,能够选择重试下单后续处理,MQ中的消息是一直存在,能够回去找,重试生产订单,这步是能够肯定的,可保证完成,处理并不复杂;
假设③④放入MQ变成异步处理,须要注意的点,问题变得复杂了,扣减库存是调用product服务,生成总订单是调用order服务,若是④成功,③失败了,就须要回滚④的订单,有多种方法解决此类问题,好比在订单完成后,订单状态是排队中,而后推送一个建立订单的消息到MQ中,就是order向其余服务通知本服务已经建立了订单,由MQ转发给订阅该消息的服务,③接收到消息,就开始执行,也存在③成功和失败的状况,执行③,而后向MQ推送一个执行结果的消息,order订阅③推送的消息,根据消息,进行④操做,就是若是③的消息是成功,则④就改状态为下单成功,不然④状态改成不成功或者取消订单其余操做
以上的操做都须要确保MQ消息推送接收是可靠的,否则半路丢了就尴尬了,就须要考虑MQ选型, 好比以可靠稳定著称的RabbitMQ,再者④经过本身的检测监听机制(好比定时任务),看product服务执行③失败的状况,而后进行后续操做(重试,取消等等),用户体验方面是不能立刻知道本身的的订单是否成功,最多见过年前的买票,抢票的时候颇有多是等待中,额外须要前端配合修改体验,此思路方法能够承受很大的并发请求
product的单价库存等信息,都会存放在redis中,从redis中操做以及读取等操做,还要考虑咱们的分布式服务在高并发状况下,须要加上redis锁,加了锁,数据库的事务利用一个注解就搞定了,若是在这步中同步redis数据的时候出问题,数据库回滚了,redis是不会回滚的,须要手动代码redis数据还原,好比try..catch,保持数据的一致性
以上是为了学习MQ消息队列,在一个完成的应用上逆向去处理问题,为了使用MQ而操做的,正常操做是须要从业务下来,正常需求开始,再以上面的例子说,把③④都异步的操做是秒杀类的业务场景,另外秒杀也有大小分别,固然每一个老板都会认为本身的产品永远是最好的,做为开发的你都明白,其余业务流量不大的话,并不建议使用该方式,先后端都须要配置改动,得不偿失,根据本身真实的业务需求来实现
对于分布式中的事务仍是很重要的,常有CAP理论的取舍,如下是CAP不一样选择方式的体现
CAP: C数据一致性,A服务可用性,P服务对分区故障的容错性
Dubbo+Zookeeper : Zookeeper 保证的是CP
SpringCloud : Eureka保证的是AP
分享你们一张图,spring源码开发成员的分布
以上是简单粗略学习心得分享
----------------------------------------------------------------------