目录html
部署环境:阿里云ECS服务器java
端口映射信息:python
eureka1:8761 | eureka2:8762mysql
config-server:8888git
shopping-product:11100github
shopping-order:11110web
api-gateway:8080redis
open-api:8081算法
https://github.com/lizzie2008/spring-cloud-app.gitspring
<groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-cloud-app</name> <description>Demo project for Spring Cloud</description> <packaging>pom</packaging>
由于Module做为子项目,咱们改写下对应的POM文件。
<parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>tech.lancelot</groupId> <artifactId>eureka-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>eureka-server</name> <description>Registry Center</description> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies>
从新Build一下项目,能正常编译。可是此时Eureka Server是不能正常启动工做的,须要在application类增长
@EnableEurekaServer
。
此时,咱们再运行Eureka Server,发现能够正常启动服务注册服务器,服务端口8080,注册地址:http://localhost:8761/eureka/。
eureka: client: fetch-registry: false #设置不从注册中心获取注册信息 register-with-eureka: false #设置自身不做为客户端注册到注册中心 spring: application: name: eureka-server #应用名称 server: port: 8761 #应用服务端口
一样,咱们修改POM文件,依赖于父项目,注意这里须要引入eureka-client
和spring-boot-starter-web
依赖。
<parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>tech.lancelot</groupId> <artifactId>shopping-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>shopping-provider</name> <description>shopping service provider</description> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> </dependencies>
须要在application类增长@EnableDiscoveryClient
,同时修改配置文件。
eureka: client: serviceUrl: defaultZone: http://localhost:8761/eureka/ #指定服务注册地址 spring: application: name: shopping-provider #应用名称
重启Eureka Client,启动后再次访问Eureka Server管理界面,能够发现order-provider服务已注册。
以前咱们的Eureka Server是单点服务,实际生产中,常常是多台注册中心,所以咱们尝试下配置2台注册中心。
启动服务器实例1:
eureka: client: # fetch-registry: false #设置不从注册中心获取注册信息 # register-with-eureka: false #设置自身不做为客户端注册到注册中心 defaultZone: http://localhost:8762/eureka/ #指定服务注册地址 spring: application: name: eureka-server1 #应用名称 server: port: 8761 #应用服务端口
启动服务器实例2:
eureka: client: # fetch-registry: false #设置不从注册中心获取注册信息 # register-with-eureka: false #设置自身不做为客户端注册到注册中心 defaultZone: http://localhost:8761/eureka/ #指定服务注册地址 spring: application: name: eureka-server2 #应用名称 server: port: 8762 #应用服务端口
重启2台注册中心,启动后分别访问2台的管理界面,能够看到2台注册中心已经相互注册。
项目增长2个服务模块,并向Eureka Server注册:shopping-product(商品服务)、shopping-order(订单服务),实现相应业务逻辑,这部分详细实现再也不阐述。
总体项目结构以下:
spring-cloud-app
--eureka-server(服务注册中心)
--shopping-common(购物公共模块)
--shopping-product(商品服务模块)
--shopping-order(订单服务模块)
系统架构如图,比较简单,一个集群服务中心,目前有2个服务提供并注册:
Spring Cloud Ribbon 是一个客户端的负载均衡器,它提供对大量的HTTP和TCP客户端的访问控制。
客户端负载均衡便是当浏览器向后台发出请求的时候,客户端会向 Eureka Server 读取注册到服务器的可用服务信息列表,而后根据设定的负载均衡策略(没有设置即用默认的),抉择出向哪台服务器发送请求。
假设有如下业务场景,shopping-order模块须要调用shopping-product提供的API接口。咱们看如何实现。
第一种方法使用构造RestTemplate,调用远程API,这种方法url是写死,若是启动多台shopping-product服务的话,那又该如何?
@Test void getProductByRestTemplate() { //1.第一种方法 RestTemplate restTemplate = new RestTemplate(); String response = restTemplate.getForObject("http://localhost:11100/api/products", String.class); Assert.hasLength(response,"未获取内容"); }
第二种方法:咱们启动2个shopping-product服务实例,分别是11100端口和9001端口,运行测试发现,会根据loadBalancerClient负载均衡机制帮咱们选择一个服务地址,进行访问调用。
@Autowired private LoadBalancerClient loadBalancerClient; @Test void getProductByLoadBalance(){ //2.第二种方法,先获取负载均衡的地址再调用API ServiceInstance instance = loadBalancerClient.choose("shopping-product"); String url=String.format("http://%s:%s/api/products",instance.getHost(),instance.getPort()); RestTemplate restTemplate = new RestTemplate(); String response = restTemplate.getForObject(url, String.class); log.info("port:"+instance.getPort()+response); }
但这样依旧非常麻烦,接下来看第三种方法。第三种方法屏蔽了API的具体url信息,只用ServerId,并根据负载均衡规则,自动路由到对应的地址。
由于eureka包中已经添加了对Ribbon的依赖,咱们能够增长断点,调试程序,发现进入RibbonLoadBalancerClient-->choose方法,返回负载均衡策略选择的ServiceInstance。
@Component public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate restTemplate() { return new RestTemplate(); } } @SpringBootTest @Slf4j class OrderServiceTest { @Autowired private RestTemplate restTemplate; @Test void getProductByServerId() { String response = restTemplate.getForObject("http://shopping-product/api/products", String.class); log.info(response); } }
固然,咱们也能够指定应用服务的负载均衡策略:
shopping-order: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
目前系统架构如图,实现shopping-product和shopping-order集群化部署,调用方式经过客户端负载均衡,来路由消费端的请求。
Feign是一个声明式的Web Service客户端,它的目的就是让Web Service调用更加简单。Feign提供了HTTP请求的模板,经过编写简单的接口和插入注解,就能够定义好HTTP请求的参数、格式、地址等信息。
而Feign则会彻底代理HTTP请求,咱们只须要像调用方法同样调用它就能够完成服务请求及相关处理。Feign整合了Ribbon和Hystrix(关于Hystrix咱们后面再讲),可让咱们再也不须要显式地使用这两个组件。
总起来讲,Feign具备以下特性:
shopping-product服务提供端暴露API。
@GetMapping("/productInfos") public List<ProductInfoOutput> findProductInfosByIds(@RequestParam(required = false) String productIds) throws Exception { //若是传入商品id参数 if (StringUtils.isNotEmpty(productIds)) { List<String> ids = Arrays.asList(productIds.split(",")); List<ProductInfo> productInfos = productService.findProductInfosByIds(ids); List<ProductInfoOutput> productInfoOutputs = ListUtils.copyProperties(productInfos, ProductInfoOutput.class); return productInfoOutputs; }else{ List<ProductInfo> productInfos = productService.findProductInfos(); List<ProductInfoOutput> productInfoOutputs = ListUtils.copyProperties(productInfos, ProductInfoOutput.class); return productInfoOutputs; } }
shopping-order模块须要调用shopping-product接口,首先咱们在服务调用端增长Maven依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-feign</artifactId> </dependency>
启动类标注开启Feign服务
@SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class,args); } }
/** * 声明式服务 */ @FeignClient("shopping-product/api/v1") public interface ProductClient { @GetMapping("/productInfos") List<ProductInfoOutput> findProductInfosByIds(@RequestParam(required = false) String productIds); }
@FeignClient(“服务名称”)映射服务调用,本质仍是http请求,只不过Feign帮咱们屏蔽了底层的请求路由,对开发者彻底透明,使得调用远程服务感受跟调用本地服务一致的编码体验。
本地调用测试,能够正常返回接口数据。
@GetMapping("/orders/findProductInfosByIds") public List<ProductInfoOutput> findProductInfosByIds(){ List<ProductInfoOutput> productInfoOutputs = productClient .findProductInfosByIds("157875196366160022, 157875227953464068"); return productInfoOutputs; }
在实现负载均衡基础上,封装声明式服务调用。实现shopping-order对shopping-product的透明调用,系统架构如图以下。
上个环境中,咱们有2个服务提供者,首先看下各自的配置,能够发现很大一部分都是重复的。
若是微服务架构中没有使用统一配置中心时,所存在的问题:
eureka: client: serviceUrl: defaultZone: http://localhost:8761/eureka/ #指定服务注册地址 spring: application: name: shopping-order #应用名称 datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC jpa: show-sql: true database-platform: org.hibernate.dialect.MySQLDialect server: port: 11110
对于一些简单的项目来讲,咱们通常都是直接把相关配置放在单独的配置文件中,以 properties 或者 yml 的格式出现,更省事儿的方式是直接放到 application.properties 或 application.yml 中。在集群部署状况下,咱们尝试来实现配置的集中管理,并支持配置的动态刷新。
一样,咱们做为子项目,修改相关依赖,加入对spring-cloud-config-server依赖
<modelVersion>4.0.0</modelVersion> <parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>config-server</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- spring cloud config 服务端包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency> </dependencies>
spring: application: name: config-server # 应用名称 cloud: config: server: git: uri: https://github.com/lizzie2008/Central-Configuration.git #配置文件所在仓库 username: 'Github username' password: 'Github password' default-label: master #配置文件分支 search-paths: spring-cloud-app #配置文件所在根目录 server: port: 8888
@EnableConfigServer
@EnableConfigServer @SpringBootApplication public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); } }
Spring Cloud Config 有它的一套访问规则,咱们经过这套规则在浏览器上直接访问就能够。
/{application}/{profile}[/{label}] /{application}-{profile}.yml /{label}/{application}-{profile}.yml /{application}-{profile}.properties /{label}/{application}-{profile}.properties
{application} 就是应用名称,对应到配置文件上来,就是配置文件的名称部分,例如我上面建立的配置文件。
{profile} 就是配置文件的版本,咱们的项目有开发版本、测试环境版本、生产环境版本,对应到配置文件上来就是以 application-{profile}.yml 加以区分,例如application-dev.yml、application-sit.yml、application-prod.yml。
{label} 表示 git 分支,默认是 master 分支,若是项目是以分支作区分也是能够的,那就能够经过不一样的 label 来控制访问不一样的配置文件了。
咱们在git项目中,新建spring-cloud-app/config-eureka-server.yml配置文件,而后访问配置中心服务器,看看能正常获取配置文件。
config-server自己做为一个服务,也能够做为服务提供方,向服务中心注册,其余的服务想要获取配置文件,只须要经过服务名称就会访问。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
@EnableDiscoveryClient
注解@EnableConfigServer @EnableDiscoveryClient @SpringBootApplication public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); } }
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服务注册地址
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服务注册地址 spring: application: name: shopping-product #应用名称 cloud: config: discovery: enabled: true service-id: config-server
spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC jpa: show-sql: true database-platform: org.hibernate.dialect.MySQLDialect server: port: 11100
shopping-product.yml
增长一个配置属性来进行测试env: dev
@RestController @RefreshScope @RequestMapping("api/env") public class EnvController { @Value("${env}") private String env; @RequestMapping public String printEnv() { return env; } }
访问http://localhost:11100/api/env,返回当前的值dev。
Spring Cloud Config 在项目启动时加载配置内容这一机制,可是若是咱们修改配置文件内容后,不会自动刷新。例如咱们上面的项目,当服务已经启动的时候,去修改 github 上的配置文件内容,这时候,再次刷新页面,对不起,仍是旧的配置内容,新内容不会主动刷新过来。那应该怎么去触发配置信息的动态刷新呢?
它提供了一个刷新机制,可是须要咱们主动触发。那就是 @RefreshScope 注解并结合 actuator ,注意要引入 spring-boot-starter-actuator 包。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
@RefreshScope
注解management: endpoints: web: exposure: include: "*"
[ "config.client.version", "env" ]
每次改了配置后,就用 postman 访问一下 refresh 接口,仍是不够方便。 github 提供了一种 webhook 的方式,当有代码变动的时候,会调用咱们设置的地址,来实现咱们想达到的目的。
填上回调的地址
也就是上面提到的 actuator/refresh 这个地址,可是必须保证这个地址是能够被 github 访问到的。这样每当github上修改了配置文件,就自动通知对应的hook地址自动刷新。
总体项目结构以下:
spring-cloud-app
--config-server(统一配置中心)
--eureka-server(服务注册中心)
--shopping-common(购物公共模块)
--shopping-product(商品服务模块)
--shopping-order(订单服务模块)
更新系统架构,新建config-server节点,也向eureka-server注册,相关服务注册节点根据配置实例名称,路由到config-server节点,动态的加载配置。
一、异步处理
好比用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就能够异步完成。由于下单付款是核心业务,发邮件和短信并不属于核心功能,而且可能耗时较长,因此针对这种业务场景能够选择先放到消息队列中,有其余服务来异步处理。
二、应用解耦:
假设公司有几个不一样的系统,各系统在某些业务有联动关系,好比 A 系统完成了某些操做,须要触发 B 系统及 C 系统。若是 A 系统完成操做,主动调用 B 系统的接口或 C 系统的接口,能够完成功能,可是各个系统之间就产生了耦合。用消息中间件就能够完成解耦,当 A 系统完成操做将数据放进消息队列,B 和 C 系统去订阅消息就能够了。这样各系统只要约定好消息的格式就行了。
三、流量削峰
好比秒杀活动,一会儿进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,因此针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,而后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
四、日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种状况的时候,就要考虑用消息队列了。若是你碰巧使用的是 RabbitMQ 或者 kafka ,并且一样也是在使用 Spring Cloud ,那能够考虑下用 Spring Cloud Stream。Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq ,本文以rabbitmq 为例。
分析目前shopping-order项目中,建立订单的代码以下:
/** * 建立订单 * */ @Transactional public String Create(OrderInput orderInput) throws Exception { //扣库存 ResultVo result1=productClient.decreaseStock(orderInput.getOrderItemInputs()); if (result1.getCode() != 0) throw new Exception("调用订单扣减库存接口出错:" + result1.getMsg()); //构建订单主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默认值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //构建订单明细 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("调用订单查询接口出错:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //订单金额总计 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); return orderMaster.getOrderId(); }
建立订单的同时,先调用商品接口扣减库存,若是占用库存成功,再生成订单。这样的话,生成订单的操做和占用商品库存的操做实际上是耦合在一块儿的。在实际电商高并发、高流量的状况下,咱们不多这么作。因此,咱们要将业务解耦,实现订单和扣减库存的异步处理。
大致思路以下:生成订单==》通知商品调用库存==》商品占用库存==》通知订单占用成功==》更新订单占用库存状态
shopping-order、shopping-product项目中
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest
public interface StreamClient { String INPUT = "myMessage"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); }
@Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { @StreamListener(value = StreamClient.INPUT) public void process(OrderInput orderInput) { log.info("StreamReceiver: {}", orderInput); } }
@RestController @RequestMapping("api/v1/stream") @Slf4j public class StreamController { private final StreamClient streamClient; @Autowired public StreamController(StreamClient streamClient) { this.streamClient = streamClient; } @GetMapping("/sendMessage") public void sendMessage() { OrderInput orderInput=new OrderInput(); orderInput.setBuyerName("小王"); orderInput.setBuyerPhone("15011111111"); orderInput.setBuyerAddress("姥姥家"); orderInput.setBuyerOpenid("11111"); streamClient.output().send(MessageBuilder.withPayload(orderInput).build()); } }
启动应用程序,测试发送接口,发现spring-cloud-stream帮咱们自动建立了一个队列,消息发送到这个队列,而后被接收端消费。
此时,若是咱们启动多个shopping-product服务实例,会有个问题,若是发送端发送一条消息,会被2个实例同时消费,在正常的业务中,这种状况是应该避免的。因此咱们须要对消息进行分组,在application.yml中增长以下配置,保证只有一个服务实例来消费。
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest cloud: stream: bindings: myMessage: group: shopping-order content-type: application/json
shopping-order做为库存占用命令的消息发送者,首先向shopping-product发送消息stock_apply(占用库存申请),shopping-product接收此消息进行库存处理,而后将库存占用处理的结果做为消息stock_result(占用库存结果)发送,shopping-order端再收到结果消息对订单状态进行更新。
spring: cloud: stream: bindings: stock_apply_output: #占用库存申请 destination: stock.apply stock_result_input: #占用库存结果 destination: stock.result group: shopping-order
spring: cloud: stream: bindings: stock_apply_input: #占用库存申请 destination: stock.apply group: shopping-product stock_result_output: #占用库存结果 destination: stock.result
public interface OrderStream { String STOCK_APPLY_OUTPUT = "stock_apply_output"; @Output(OrderStream.STOCK_APPLY_OUTPUT) MessageChannel stockApplyOutput(); String STOCK_RESULT_INPUT = "stock_result_input"; @Input(OrderStream.STOCK_RESULT_INPUT) SubscribableChannel stockResultInput(); }
public interface ProductStream { String STOCK_APPLY_INPUT = "stock_apply_input"; @Input(ProductStream.STOCK_APPLY_INPUT) SubscribableChannel stockApplyInput(); String STOCK_RESULT_OUTPUT = "stock_result_output"; @Output(ProductStream.STOCK_RESULT_OUTPUT) MessageChannel stockResultOutput(); }
/** * 建立订单 */ @Transactional public String Create(OrderInput orderInput) throws Exception { //构建订单主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默认值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //构建订单明细 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("调用订单查询接口出错:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //订单金额总计 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); //扣库存 StockApplyInput stockApplyInput = new StockApplyInput(); stockApplyInput.setOrderId(orderMaster.getOrderId()); stockApplyInput.setOrderItemInputs(orderInput.getOrderItemInputs()); orderStream.stockApplyOutput().send(MessageBuilder.withPayload(stockApplyInput).build()); return orderMaster.getOrderId(); }
@Service @Slf4j @EnableBinding(ProductStream.class) public class ProductService { private final ProductInfoRepository productInfoRepository; private final ProductCategoryRepository productCategoryRepository; @Autowired public ProductService(ProductInfoRepository productInfoRepository, ProductCategoryRepository productCategoryRepository) { this.productInfoRepository = productInfoRepository; this.productCategoryRepository = productCategoryRepository; } /** * 扣减库存 * */ @Transactional @StreamListener(ProductStream.STOCK_APPLY_INPUT) @SendTo(ProductStream.STOCK_RESULT_OUTPUT) public StockResultOutput processStockApply(StockApplyInput stockApplyInput) throws Exception { log.info("占用库存消息被消费..."); StockResultOutput stockResultOutput = new StockResultOutput(); stockResultOutput.setOrderId(stockApplyInput.getOrderId()); try { for (OrderItemInput orderItemInput : stockApplyInput.getOrderItemInputs()) { Optional<ProductInfo> productInfoOptional = productInfoRepository.findById(orderItemInput.getProductId()); if (!productInfoOptional.isPresent()) throw new Exception("商品不存在."); ProductInfo productInfo = productInfoOptional.get(); int result = productInfo.getProductStock() - orderItemInput.getProductQuantity(); if (result < 0) throw new Exception("商品库存不知足."); productInfo.setProductStock(result); productInfoRepository.save(productInfo); } stockResultOutput.setIsSuccess(true); stockResultOutput.setMessage("OK"); return stockResultOutput; } catch (Exception e) { stockResultOutput.setIsSuccess(false); stockResultOutput.setMessage(e.getMessage()); return stockResultOutput; } } }
@StreamListener(OrderStream.STOCK_RESULT_INPUT) public void processStockResult(StockResultOutput stockResultOutput) { log.info("库存消息返回" + stockResultOutput); Optional<OrderMaster> optionalOrderMaster = orderMasterRepository.findById(stockResultOutput.getOrderId()); if (optionalOrderMaster.isPresent()) { OrderMaster orderMaster = optionalOrderMaster.get(); if (stockResultOutput.getIsSuccess()) { orderMaster.setOrderStatus(OrderStatus.OCCUPY_SUCCESS); } else { orderMaster.setOrderStatus(OrderStatus.OCCUPY_FAILURE); } orderMasterRepository.save(orderMaster); } }
执行调试结果,跟踪执行结果:生成订单同时发送库存申请命令,商品模块处理库存申请成功后,返回库存占用结果告知订单模块,从而实现订单生成和商品库存占用的逻辑的解耦。
在原有的架构基础上,咱们对商品和订单服务进行了应用解耦,库存占用逻辑异步化,经过消息队列传递消息,并结合spring cloud stream对消息input和output绑定,使得在程序中很方便的进行消息发送和接收处理。
Zuul是Netflix开源的微服务网关,能够和Eureka、Ribbon、Hystrix等组件配合使用,Spring Cloud对Zuul进行了整合与加强,Zuul默认使用的HTTP客户端是Apache HTTPClient,也可使用RestClient或okhttp3.OkHttpClient。 Zuul的主要功能是路由转发和过滤器。zuul默认和Ribbon结合实现了负载均衡的功能
zuul的核心是一系列的filters, 其做用类比Servlet框架的Filter,或者AOP。zuul把请求路由到用户处理逻辑的过程当中,这些filter参与一些过滤处理,好比Authentication,Load Shedding等
Zuul使用一系列不一样类型的过滤器,使咱们可以快速灵活地将功能应用于咱们的边缘服务。这些过滤器可帮助咱们执行如下功能:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-zuul</artifactId> </dependency> </dependencies>
EnableDiscoveryClient
和@EnableZuulProxy
注解。@EnableDiscoveryClient @EnableZuulProxy @SpringBootApplication public class ApiGatewayApplication { public static void main(String[] args) { SpringApplication.run(ApiGatewayApplication.class, args); } }
默认的路由规则是按照服务的名称来路由服务,固然咱们也能够自定义。在zuul中,路由匹配的路径表达式采用ant风格定义
通配符 | 说明 |
---|---|
? | 匹配任意单个字符 |
* | 匹配任意数量的字符 |
** | 匹配任意数量的字符,支持多级目录 |
zuul: routes: # 简洁写法 shopping-product: /product/**
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
management: endpoints: web: exposure: include: "*"
zuul: routes: # 简洁写法 shopping-product: /product/** # 排除某些路由 ignored-patterns: - /**/productInfos
这样咱们再访问这个接口时,就提示 Not Found 错误了
默认状况下,spring cloud zuul在请求路由时,会过滤掉http请求头信息中一些敏感信息,防止它们被传递到下游的外部服务器。默认的敏感头信息经过zuul.sensitiveHeaders参数定义,默认包括cookie,set-Cookie,authorization三个属性。因此,咱们在开发web项目时经常使用的cookie在spring cloud zuul网关中默认时不传递的,这就会引起一个常见的问题,若是咱们要将使用了spring security,shiro等安全框架构建的web应用经过spring cloud zuul构建的网关来进行路由时,因为cookie信息没法传递,咱们的web应用将没法实现登陆和鉴权。有时候,针对某些路由,咱们须要传递这个cookie。
zuul: routes: # 彻底写法 product-route: path: /product/** serviceId: shopping-product # 将指定路由的敏感头设置为空 sensitiveHeaders:
以前路由的配置都是写在配置文件中,若是路由规则变化之后,须要重启网关服务。可是实际生产环境,通常都须要动态的加载路由的配置,不能轻易重启网关服务。
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服务注册地址 spring: application: name: api-gateway #应用名称 cloud: config: discovery: enabled: true service-id: config-server
@RefreshScope
注解@Component public class ZuulConfiguration { @ConfigurationProperties("zuul") @RefreshScope public ZuulProperties zuulProperties(){ return new ZuulProperties(); } }
设想如下场景:咱们须要判断用户请求的参数是否包含认证信息,若是包含token信息,则能够访问,不然禁止访问。能够用Zuul Filter很方便的实如今网关端,统一进行认证。
PRE_TYPE
PRE_DECORATION_FILTER_ORDER-1
true
/** * 验证token 过滤器 */ @Component public class TokenFilter extends ZuulFilter { @Override public String filterType() { return PRE_TYPE; } @Override public int filterOrder() { return 0; } @Override public boolean shouldFilter() { return true; } @Override public Object run() throws ZuulException { RequestContext currentContext = RequestContext.getCurrentContext(); HttpServletRequest request = currentContext.getRequest(); //测试在url参数中获取token String token = request.getParameter("token"); if(StringUtils.isEmpty(token)){ currentContext.setSendZuulResponse(false); currentContext.setResponseStatusCode(HttpStatus.UNAUTHORIZED.value()); } return null; } }
@Component public class AddResHeaderFilter extends ZuulFilter{ @Override public String filterType() { return POST_TYPE; } @Override public int filterOrder() { return SEND_RESPONSE_FILTER_ORDER - 1; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { RequestContext requestContext = RequestContext.getCurrentContext(); HttpServletResponse response = requestContext.getResponse(); response.setHeader("X-Foo", UUID.randomUUID().toString()); return null; } }
这里介绍一种限流的设计方案:
对于不少应用场景来讲,除了要求可以限制数据的平均传输速率外,还要求容许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而若是请求须要被处理,则须要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
Google公司已经实现了上述的令牌桶的算法,直接使用 RateLimiter 就能够经过Zuul实现限流的功能:
@Component public class RateLimitFilter extends ZuulFilter { private static final RateLimiter RATE_LIMITER = RateLimiter.create(100); @Override public String filterType() { return PRE_TYPE; } @Override public int filterOrder() { return SERVLET_DETECTION_FILTER_ORDER - 1; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { if (!RATE_LIMITER.tryAcquire()) { throw new RuntimeException("未能获取到令牌."); } return null; } }
总体项目结构以下:
spring-cloud-app
--api-gateway(服务网关)
--config-server(统一配置中心)
--eureka-server(服务注册中心)
--shopping-common(购物公共模块)
--shopping-product(商品服务模块)
--shopping-order(订单服务模块)
目前全部的客户端请求,首先被发送到统一网关服务处理,而后由网关进行限流、熔断、权限验证、记录日志等等,而后根据自定义的路由规则,再分发到不一样的应用服务中去,应用服务器返回处理结果后,由网关统一返回给客户端。
在分布式环境中,许多服务依赖项中的一些必然会失败。Hystrix是一个库,经过添加延迟容忍和容错逻辑,帮助你控制这些分布式服务之间的交互。Hystrix经过隔离服务之间的访问点、中止级联失败和提供回退选项来实现这一点,全部这些均可以提升系统的总体弹性。
在实际工做中,尤为是分布式、微服务愈来愈广泛的今天,一个服务常常须要调用其余的服务,即RPC调用,而调用最多的方式仍是经过http请求进行调用,这里面就有一个问题了,若是调用过程当中,由于网络等缘由,形成某个服务调用超时,若是没有熔断机制,此处的调用链路将会一直阻塞在这里,在高并发的环境下,若是许多个请求都卡在这里的话,服务器不得不为此分配更多的线程来处理源源不断涌入的请求。
更恐怖的是,若是这是一个多级调用,即此处的服务的调用结果还被其余服务调用了,这就造成了所谓的雪崩效应,后果将不堪设想。所以,须要某种机制,在必定的异常接口调用出现的时候,可以自动发现这种异常,并快速进行服务降级。
/** * Hystrix 测试 */ @RestController @RequestMapping("api/hystrix") public class HystrixController { @GetMapping("/getProductEnv") public String getProductEnv() { RestTemplate restTemplate = new RestTemplate(); return restTemplate.postForObject("http://localhost:11100/api/env", null, String.class); } }
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
@EnableCircuitBreaker
注解,或者将@SpringBootApplication
、@EnableDiscoveryClient
、@EnableCircuitBreaker
三个合并成一个@SpringCloudApplication
注解。@EnableFeignClients(basePackages = "tech.lancelot.shoppingorder.client") //@SpringBootApplication //@EnableDiscoveryClient //@EnableCircuitBreaker @SpringCloudApplication public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class, args); } }
@HystrixCommand
注解,并指定调用方法失败时的错误处理回调。也能够为整个类增长@DefaultProperties
注解,定义一个默认的返回方法/** * Hystrix 测试 */ @RestController @RequestMapping("api/hystrix") public class HystrixController { @HystrixCommand(fallbackMethod = "defaultFallback") @GetMapping("/getProductEnv") public String getProductEnv() { RestTemplate restTemplate = new RestTemplate(); return restTemplate.postForObject("http://localhost:11100/api/env", null, String.class); } // 默认服务不可达的返回信息 private String defaultFallback() { return "太拥挤了, 请稍后再试~~"; } }
若是咱们没有配置默认的超时时间,Hystrix 将取 default_executionTimeoutInMilliseconds(1秒)做为默认超时时间,也能够自定义超时时间。
@HystrixCommand(commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "3000")})
这样的话,shopping-order调用远程服务,超过3s以后,马上返回错误处理,不会再阻塞。
hystrix: command: default: # 方法默认属性 execution: isolation: thread: timeoutInMilliseconds: 1000 getProductEnv: # 该名称方法属性 execution: isolation: thread: timeoutInMilliseconds: 3000
若是某个目标服务调用慢或者有大量超时,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。若是目标服务状况好转则恢复调用。
熔断器有三个状态 CLOSED
、OPEN
、HALF_OPEN
熔断器默认关闭状态,当触发熔断(至少有 circuitBreaker.requestVolumeThreshold 个请求,错误率达到 circuitBreaker.errorThresholdPercentage)后状态变动为 OPEN
,在等待到指定的时间(circuitBreaker.sleepWindowInMilliseconds),Hystrix会放请求检测服务是否开启,这期间熔断器会变为HALF_OPEN
半开启状态,熔断探测服务可用则继续变动为 CLOSED
关闭熔断器。
@HystrixCommand(commandProperties = { @HystrixProperty(name = "circuitBreaker.enabled", value = "true"), //设置熔断 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),//请求数达到后才计算 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000"), //休眠时间窗 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "60"), //错误率 })
Spring Coud 还给 Hytrix 提供了一个可视化的组件:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency>
@EnableHystrixDashboard
注解@EnableFeignClients(basePackages = "tech.lancelot.shoppingorder.client") //@SpringBootApplication //@EnableDiscoveryClient //@EnableCircuitBreaker @SpringCloudApplication @EnableHystrixDashboard public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class, args); } }
经过以上容错方法的实现,就能够构建更加稳定、可靠的分布式系统:
微服务架构是一个分布式架构,它按业务划分服务单元,一个分布式系统每每有不少个服务单元。因为服务单元数量众多,业务的复杂性,若是出现了错误和异常,很难去定位。主要体如今,一个请求可能须要调用不少个服务,而内部服务的调用复杂性,决定了问题难以定位。因此微服务架构中,必须实现分布式链路追踪,去跟进一个请求到底有哪些服务参与,参与的顺序又是怎样的,从而达到每一个请求的步骤清晰可见,出了问题,很快定位。
OpenTracing 是一个轻量级的标准化层,它位于应用程序/类库和追踪或日志分析程序之间。
+-------------+ +---------+ +----------+ +------------+ | Application | | Library | | OSS | | RPC/IPC | | Code | | Code | | Services | | Frameworks | +-------------+ +---------+ +----------+ +------------+ | | | | | | | | v v v v +------------------------------------------------------+ | OpenTracing | +------------------------------------------------------+ | | | | | | | | v v v v +-----------+ +-------------+ +-------------+ +-----------+ | Tracing | | Logging | | Metrics | | Tracing | | System A | | Framework B | | Framework C | | System D | +-----------+ +-------------+ +-------------+ +-----------+
OpenTracing 的优点
OpenTracing 数据模型
OpenTracing 中的 Trace(调用链)经过归属于此调用链的 Span 来隐性的定义。
特别说明,一条 Trace(调用链)能够被认为是一个由多个 Span 组成的有向无环图(DAG图),Span 与 Span 的关系被命名为 References。
例如:下面的示例 Trace 就是由8个 Span 组成:
单个 Trace 中,span 间的因果关系 [Span A] ←←←(the root span) | +------+------+ | | [Span B] [Span C] ←←←(Span C 是 Span A 的孩子节点, ChildOf) | | [Span D] +---+-------+ | | [Span E] [Span F] >>> [Span G] >>> [Span H] ↑ ↑ ↑ (Span G 在 Span F 后被调用, FollowsFrom)
有些时候,使用下面这种,基于时间轴的时序图能够更好的展示 Trace(调用链):
单个 Trace 中,span 间的时间关系 ––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time [Span A···················································] [Span B··············································] [Span D··········································] [Span C········································] [Span E·······] [Span F··] [Span G··] [Span H··]
每一个 Span 包含如下的状态:(译者注:因为这些状态会反映在 OpenTracing API 中,因此会保留部分英文说明)
键值对中,键必须为 string,值能够是任意类型。
可是须要注意,不是全部的支持 OpenTracing 的 Tracer,都须要支持全部的值类型。
每个 SpanContext 包含如下状态:
更多关于 OpenTracing 数据模型的知识,请参考 OpenTracing语义标准。
OpenTracing 实现
这篇文档列出了全部 OpenTracing 实现。在这些实现中,比较流行的为 Jaeger 和 Zipkin。
事件类型
cs ( Client Send ) :客户端发起请求的时间
cr ( Client Received ) :客户端收处处理完请求的时间。
ss ( Server Send ) :服务端处理完逻辑的时间。
sr ( Server Received ) :服务端收到调用端请求的时间。
客户端调用时间=cr-cs
服务端处理时间=sr-ss
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency>
目前,链路追踪组件有Google的Dapper,Twitter 的 Zipkin,以及阿里的Eagleeye (鹰眼)等,它们都是很是优秀的链路追踪开源组件。本文主要讲述如何在Spring Cloud Sleuth中集成Zipkin。
Zipkin Server主要包括四个模块:
Collector 接收或收集各应用传输的数据
Storage 存储接受或收集过来的数据,当前支持Memory,MySQL,Cassandra,ElasticSearch等,默认存储在内存中。
API(Query) 负责查询Storage中存储的数据,提供简单的JSON API获取数据,主要提供给web UI使用
Web 提供简单的web界面
首先,安装 zipkin,为了方便直接用 docker 进行安装,具体详见容器化部署章节,这里再也不详述。
引入sleuth-zipkin相关依赖,由于 starter-zipkin 已经包含 starter-sleuth 的依赖,因此能够把原先的 sleuth依赖去掉。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>
spring: zipkin: base-url: http://zipkin:9411/
spring: sleuth: sampler: rate: 100 zipkin: base-url: http://zipkin:9411/
在服务调用的过程当中,经过Sleuth将链路信息(通过抽样后的信息)统一上报给Zipkin,经过Zipkin就能够集中查看和管理微服务架构中的调用链路信息,便于开发人员与运维人员跟踪和调试问题。
[root@localhost ~]# yum install docker [root@localhost ~]# systemctrl enable docker #设置docker开机启动 [root@localhost ~]# systemctrl start docker #启动docker
{ "registry-mirrors": ["http://hub-mirror.c.163.com"], "registry-mirrors": ["https://njrds9qc.mirror.aliyuncs.com"] }
[root@localhost ~]# systemctl daemon-reload [root@localhost ~]# systemctl restart docker
[root@localhost ~]# docker -v Docker version 1.13.1, build 7f2769b/1.13.1
[root@localhost ~]# pip -V
[root@localhost ~]# yum -y install epel-release [root@localhost ~]# yum -y install python-pip [root@localhost ~]# pip install --upgrade pip
[root@localhost ~]# pip install docker-compose
[root@localhost ~]# pip install more-itertools==5.0.0
[root@localhost ~]# docker-compose -v docker-compose version 1.25.0, build b42d419
application.yml:
spring: application: name: eureka-server #应用名称 profiles: active: peer1
application-peer1.yml:
eureka: client: service-url: defaultZone: http://peer2:8762/eureka/ #指定服务注册地址 server: port: 8761 #应用服务端口
application-peer2.yml:
eureka: client: service-url: defaultZone: http://peer1:8761/eureka/ #指定服务注册地址 server: port: 8762 #应用服务端口
FROM hub.c.163.com/library/java:8-alpine ADD target/*.jar app.jar EXPOSE 8761 EXPOSE 8762 ENTRYPOINT ["java","-jar","/app.jar"]
mvn clean package -Dmaven.test.skip=true -U docker build -t spring-cloud-app/eureka-server:v1 .
[root@localhost ~]# docker pull mysql:5.7
mysql: image: docker.io/mysql:5.7 hostname: mysql networks: - eureka-net ports: - "3306:3306" environment: MYSQL_ROOT_PASSWORD: "123456" volumes: - "./mysql/conf:/etc/mysql" - "./mysql/logs:/var/log/mysql" - "./mysql/data:/var/lib/mysql"
-management
表示有管理界面的,能够浏览器访问。5672是访问端口,15672是管理端口。
[root@localhost ~]# docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.2-management
访问端口管理界面,输入默认用户名/密码 :guest/guest
[root@localhost ~]# docker run -d -p 9411:9411 openzipkin/zipkin
docker-compose.yml:
version: "2" services: eureka1: image: spring-cloud-app/eureka-server:v1 hostname: eureka1 networks: - eureka-net ports: - "8761:8761" environment: - spring.profiles.active=peer1 eureka2: image: spring-cloud-app/eureka-server:v1 hostname: eureka2 networks: - eureka-net ports: - "8762:8762" environment: - spring.profiles.active=peer2 config-server: image: spring-cloud-app/config-server:v1 hostname: config-server networks: - eureka-net ports: - "8888:8888" mysql: image: docker.io/mysql:5.7 hostname: mysql networks: - eureka-net ports: - "3306:3306" environment: MYSQL_ROOT_PASSWORD: "123456" volumes: - "./mysql/conf:/etc/mysql" - "./mysql/logs:/var/log/mysql" - "./mysql/data:/var/lib/mysql" rabbitmq: image: docker.io/rabbitmq:3.8.2-management hostname: rabbitmq networks: - eureka-net ports: - "5672:5672" - "15672:15672" zipkin: image: docker.io/openzipkin/zipkin:2.19.2 hostname: zipkin networks: - eureka-net ports: - "9411:9411" networks: eureka-net: driver: bridge