项目中通常会请求第三方的接口,也会对外提供接口,多是RPC,也多是HTTP等方式。在对外提供接口时,有必要提供相应的批量接口,好的批量实现可以提高性能。前端
高并发场景中,调用批量接口相比调用非批量接口有更大的性能优点。但有时候,请求更多的是单个接口,不可以直接调用批量接口,若是这个接口是高频接口,对其作请求合并就颇有必要了。好比电影网站的获取电影详情接口,APP的一次请求是单个接口调用,用户量少的时候请求也很少,彻底没问题;但同一时刻每每有大量用户访问电影详情,是个高并发的高频接口,若是都是单次查询,后台就不必定能hold住了。为了优化这个接口,后台能够将相同的请求进行合并,而后调用批量的查询接口。以下图所示java
合并请求前,咱们通常是调用服务层的单次建立方法。看起来都比较简单,且易于理解。spring
以建立设备接口为例。缓存
@Reference(check = false) private DeviceService deviceService; /** * 注册设备 * * @param productKey 产品key * @param deviceName 设备名 * @return 设备ID */ public R<Long> registDevice(String productKey, String deviceName) { log.debug("开始注册: {}, {}", productKey, deviceName); DeviceRequestDto deviceCreateQuery = new DeviceRequestDto() .setProductKey(productKey) .setName(deviceName); Long deviceId = deviceService.createDevice(deviceCreateQuery); return deviceId != null ? R.ok(deviceId) : R.error(DEVICE_CREATE_ERROR); }
请求合并的好处前面有提到,那不能每次写接口就作请求合并吧?咱们要明白,技术无好坏,要在特定的业务场景下衡量利弊,采用与否须要深思熟虑。合并请求会令代码变得复杂,也会增长必定的接口延迟,其中还可能存在各类未知的风险。多线程
合并请求是针对高并发场景的一种手段,咱们实现请求合并以前,要结合业务场景思考一番,是否值得承受的合并带来的访问延迟?用户体验是否会打折扣?自身的技术是否足够hold住请求合并带来的未知风险?并发
思路:收到前端的请求时,先存起来,隔段时间批量请求第三方服务批量接口,而后分别通知存起来的请求,而且响应前端。框架
仍是针对上述设备注册接口,咱们对其进行改造,来实现一个简单的请求合并。dom
首先,咱们须要有可以批量调用的接口。在对外提供接口时,也很是有必要提供相应的批量接口,且内部实现应该是优化过的。高并发
此处咱们在服务层模拟了一个批量建立设备的接口, 以下:性能
/** * 批量建立设备接口 * * @param deviceRequestDtoList 入参信息 * @return 建立结果 */ R<List<DeviceCreateResp>> batchCreateDevice(List<DeviceCreateQuery> deviceList);
@Data public class DeviceCreateQuery implements Serializable { /** * 产品标识 */ private String productKey; /** * 设备名称 */ private String name; /** * 请求源,一次批量请求保证惟一 */ private String requestSource; }
@Data public class DeviceCreateResp implements Serializable { /** * 设备ID */ private Long deviceId; /** * 请求源,一次批量请求保证惟一 */ private String requestSource; }
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
@Data static class DeviceCreateRequest { /** 产品key */ private String productKey; /** 设备名 */ private String deviceName; /** 请求源,需保证惟一 */ private String requestSource; /** CompletableFuture接口 */ private CompletableFuture<Long> completedFuture; }
public R<Long> registDevice(String productKey, String deviceName) { log.debug("开始注册: {}, {}", productKey, deviceName); // 缓存请求 ====== start CompletableFuture<Long> completedFuture = new CompletableFuture<>(); DeviceCreateRequest deviceCreateRequest = new DeviceCreateRequest(); deviceCreateRequest.setProductKey(productKey); deviceCreateRequest.setDeviceName(deviceName); deviceCreateRequest.setRequestSource(UUID.randomUUID().toString()); deviceCreateRequest.setCompletedFuture(completedFuture); deviceCreateQueue.add(deviceCreateRequest); // 缓存请求 ====== end Long deviceId = null; try { deviceId = completedFuture.get(); } catch (Exception e) { log.error("设备注册失败", e); } return deviceId != null ? R.ok(deviceId) : R.error(DEVICE_CREATE_ERROR); }
此处使用了spring,在init方法中利用定时任务线程池批量分发请求。同时使用了newScheduledThreadPool
,其中线程池大小和定时间隔时长须要根据业务量作权衡
/** 积攒请求的阻塞队列 */ private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>(); /** 线程池数量 */ @Value("${iot.register.merge.device.request.num:100}") private int createDeviceMergeNum; /** 定时间隔时长 */ @Value("${iot.register.merge.device.request.period:30}") private long createDeviceMergePeriod; @Reference(check = false) private DeviceService deviceService; @PostConstruct public void init() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum); scheduledExecutorService.scheduleAtFixedRate(() -> { // 把出queue的请求存储一次 List<DeviceCreateRequest> questBak = new ArrayList<>(); // 批量建立设备的入参 List<DeviceCreateQuery> deviceCreateQueryList = new ArrayList<>(); int size = deviceCreateQueue.size(); for (int i = 0; i < size; i++) { DeviceCreateRequest deviceCreateRequest = deviceCreateQueue.poll(); if (Objects.nonNull(deviceCreateRequest)) { questBak.add(deviceCreateRequest); deviceCreateQueryList.add(buildDeviceCreateQuery(deviceCreateRequest)); } } if (!deviceCreateQueryList.isEmpty()) { try { List<DeviceCreateResp> response = deviceService.batchCreateDevice(deviceCreateQueryList); Map<String, Long> collect = response.stream() .collect(Collectors.toMap( DeviceCreateResp::getRequestSource, DeviceCreateResp::getDeviceId )); // 通知请求的线程 for (DeviceCreateRequest deviceCreateRequest : questBak) { deviceCreateRequest.getCompletedFuture().complete(collect.get(deviceCreateRequest.getRequestSource())); } } catch (Throwable throwable) { log.error("批量注册设备异常", throwable); // 通知请求的线程-异常 questBak.forEach(deviceCreateRequest -> deviceCreateRequest.getCompletedFuture().obtrudeException(throwable)); } } }, 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS); }
请求合并是解决高并发场景下某些问题的一种思路,本文只作了一个简单的实现,算是对这块知识的一次实践吧。用到了BlockingDeque
、CompletableFuture
接口,涉及Java多线程相关的知识,实现方式比较野蛮。业界有不少优秀的开源框架作请求合并,好比hystrix
,须要花时间好好学习哈哈。