接着上一篇:SpringCloud学习之Hystrix(一)java
上一篇写到可使用请求缓存来减轻高并发时的请求线程消耗、下降请求相应时间。请求合并又是什么东西呢?在微服务架构中,咱们将项目拆分红多个模块,每一个模块间经过远程调用进行通讯。远程调用最多见的问题是通讯消耗与链接数占用。在高并发状况下。随着通讯次数的增长,通讯时间会增长;由于依赖服务的线程池资源有限,将出现排队等待与响应延迟的状况。请求合并正是Hystrix为解决这两个问题而开发的,以减小通讯消耗和线程数的占用。<!--more-->git
Hystrix提供HystrixCollapser来实现请求转发,在HystrixCommand以前放置一个合并处理器,将处于一个很短的时间窗(默认为10毫秒)内对同一个依赖服务的多个请求进行整合并以批量方式发起请求,
![]()
HystrixCollapser是一个抽象类,进入源码能够看到,它指定了三个不一样的类。
对于这三个类型的使用:spring
//用来定义获取请求参数的方法 public abstract RequestArgumentType getRequestArgument(); //合并请求产生批量命令的具体实现 protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<ResponseType, RequestArgumentType>> requests); //批量命令结果返回后的处理,这里须要实现将批量命令结果拆分并传递给合并前各个原子请求命令的逻辑 protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<ResponseType, RequestArgumentType>> requests);
修改服务提供者
在以前的代码基础上,在USER-SERVICE中添加一个接口,这里使用到的两个接口:express
@GetMapping("/user/{id}") public User findById(@PathVariable("id") Long id){ return userRepository.findOne(id); } @GetMapping("/users/ids") public List<User> findUserByIds(String ids){ System.out.println(">>>>>>>>>>"+ids); String[] split = ids.split(","); List<User> result = new ArrayList<>(); for (String s : split){ Long id = Long.valueOf(s); User user = userRepository.findOne(id); result.add(user); } return result; }
服务消费者缓存
public User getUserById(@CacheKey("id") Long id) { return restTemplate.getForObject("http://USER-SERVICE/user/{1}", User.class, id); } public List<User> findUserByIds(List<Long> ids){ System.out.println("findUserByIds---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); String str = StringUtils.join(ids,","); User[] users = restTemplate.getForObject("http://USER-SERVICE/users/ids?ids={1}", User[].class,str); return Arrays.asList(users); }
public class UserBatchCommand extends HystrixCommand<List<User>> { private final Logger logger = LoggerFactory.getLogger(UserCommand.class); private List<Long> ids; private ArticleService articleService; public UserBatchCommand(ArticleService articleService,List<Long> ids){ super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("userGroup"))); this.ids = ids; this.articleService = articleService; } @Override protected List<User> run() throws Exception { return articleService.findUserByIds(ids); } }
public class UserCollapdeCommand extends HystrixCollapser<List<User>,User,Long> { private ArticleService articleService; private Long id; public UserCollapdeCommand(ArticleService articleService,Long id){ //设置时间延迟属性,延迟时间窗为100毫秒 super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapdeCommand")).andCollapserPropertiesDefaults( HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100) )); this.articleService = articleService; this.id = id; } /** * 返回单个请求参数id * @return */ @Override public Long getRequestArgument() { return id; } /** * 这里经过获取单个请求的参数来组织批量请求命令UserBatchCommand的实例 * @param collapsedRequests 保存了延迟时间窗中收集到的全部获取单个User的请求。 * @return */ @Override protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collapsedRequests) { ArrayList<Long> userIds = new ArrayList<>(collapsedRequests.size()); userIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); return new UserBatchCommand(articleService,userIds); } /** * 该方法在批量请求命令UserBatchCommand执行完成以后执行 * 经过遍历batchResponse来为collapsedRequests设置请求结果。 * @param batchResponse 保存了createCommand中组织的批量请求返回结果 * @param collapsedRequests 每一个被合并的请求, */ @Override protected void mapResponseToRequests(List<User> batchResponse, Collection<CollapsedRequest<User, Long>> collapsedRequests) { int count = 0; for (CollapsedRequest<User,Long> collapsedRequest : collapsedRequests){ User user = batchResponse.get(count++); collapsedRequest.setResponse(user); } } }
@GetMapping("/testBathCommand") public List<User> testBathCommand() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); UserCollapdeCommand u1 = new UserCollapdeCommand(articleService, 1L); UserCollapdeCommand u2 = new UserCollapdeCommand(articleService, 2L); UserCollapdeCommand u3 = new UserCollapdeCommand(articleService, 3L); UserCollapdeCommand u4 = new UserCollapdeCommand(articleService, 4L); Future<User> q1 = u1.queue(); Future<User> q2 = u2.queue(); Future<User> q3 = u3.queue(); Future<User> q4 = u4.queue(); User e1 = q1.get(); User e2 = q2.get(); User e3 = q3.get(); User e4 = q4.get(); List<User> res = new ArrayList<>(); res.add(e1); res.add(e2); res.add(e3); res.add(e4); System.out.println(res); return res; }
上面使用继承类的方式可能会有些繁琐,在Hystrix中一样提供了注解来优雅的实现请求合并。架构
@HystrixCollapser(batchMethod = "findAll",collapserProperties = { @HystrixProperty(name = "DelayInMilliseconds",value = "100") }) public User findOne(Long id){ return null; } @HystrixCommand public List<User> findAll(List<Long> ids){ System.out.println("findUserByIds---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); String str = StringUtils.join(ids,","); User[] users = restTemplate.getForObject("http://USER-SERVICE/users/ids?ids={1}", User[].class,str); return Arrays.asList(users); }
这里经过@HystrixCollapser注解建立合并请求器,经过batchMethod属性指定实现批量请求的findAll方法,经过HystrixProperty属性为合并请求器设置相关属性。 @HystrixProperty(name = "DelayInMilliseconds",value = "100")
设置时间窗为100毫秒。这里直接调用findOne方法便可,使用注解确实是简单。
测试接口并发
@GetMapping("/testBathCommandAnn") public List<User> testBathCommandAnn() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); Future<User> q1 = articleService.findOne(1L); Future<User> q2 = articleService.findOne(2L); Future<User> q3 = articleService.findOne(3L); Future<User> q4 = articleService.findOne(4L); User e1 = q1.get(); User e2 = q2.get(); User e3 = q3.get(); User e4 = q4.get(); List<User> res = new ArrayList<>(); res.add(e1); res.add(e2); res.add(e3); res.add(e4); System.out.println(res); return res; }
Hystrix提供了很是灵活的配置方式,全部属性存在下面四个优先级的配置(优先级由低到高):app
Hystrix中主要的三个属性:异步
关于属性参数的更多详解能够查看《SpringCloud微服务实战》ide
仪表盘是Hystrix Dashboard提供的用来实时监控Hystrix的指标信息的组件。经过该组件反馈的实时信息,能够帮助咱们快速的发现系统存在的问题。项目结构图
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
@EnableHystrixDashboard
开启HystrixDashboard功能。spring: application: name: hystrix-dashboard server: port: 9999
spring-boot-starter-actuator
监控模块的以开启监控相关的端点。还有hystrix依赖是必定要的。而且确保服务以及使用@EnableCircuitBreaker
开启了断路器功能。<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
可使用Turbine实现集群监控,该端点为/trubine.stream。和上面同样,新建一个SpringBoot项目,这里命名为hystrix-turbine。添加如下依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-turbine</artifactId> </dependency> </dependencies>
在主类使用@EnableTurbine
注解开启Trubine
@SpringBootApplication @EnableDiscoveryClient @EnableTurbine public class TurbineApplication { public static void main(String[] args) { SpringApplication.run(TurbineApplication.class, args); } }
修改配置文件
spring.application.name=hystrix-turbine server.port=9998 management.port=9000 eureka.client.service-url.defaultZone=http://localhost:8888/eureka/ turbine.app-config=article-service turbine.cluster-name-expression="default" turbine.combine-host-port=true
- turbine.app-config=ribbon-consumer指定了要监控的应用名字为ribbon-consumer
- turbine.cluster-name-expression=”default”,表示集群的名字为default
- turbine.combine-host-port=true表示同一主机上的服务经过host和port的组合来进行区分,默认状况下是使用host来区分,这样会使本地调试有问题
最后启动项目,并启动两个article-service,而后添加对 http://localhost:9998/turbine.stream的监控。
做为SpringCloud学习笔记,有不少地方很差。望指出!!!