Hystrix请求合并与请求缓存(二):请求合并

请求合并

前言

今日继续摸鱼Hystrix的请求合并部分,可能不如请求缓存分析的详细,可是我感受足够表达实现原理了。java

本文选择了较为简单的请求合并的用例进行切入并分析,即CommandCollapserGetValueForKey,而非ObservableCollapserGetWordForNumber,原理都是一致的,只是ObservableCollapserGetWordForNumber提供了更为丰富的接口,供业务实现。git

请求合并的使用

CommandCollapserGetValueForKey例子看,只要作以下3件事,就能实现请求合并。github

一、继承HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>。 二、重写三个方法,分别为getRequestArgumentcreateCommandmapResponseToRequests。 三、写一个BatchCommand,即请求合并后的一个HystrixCommand缓存

接下来,能够从源码层面上看,如何经过这三步操做实现请求合并。app

HystrixCollapser

  • 须要注意<BatchReturnType, ResponseType, RequestArgumentType>泛型的含义,已经在代码块中添加了相应注释。
**
 * 根据设定时间参数以及合并请求数,将多个HystrixCommand合并成一次的HystrixCommand,从而将短期调用服务的次数减小。
 * <p>
 * 一般将时间窗口设为10ms左右
 * 
 * @param <BatchReturnType>
 *           合并后的HystrixCommand的返回类型,例如String变成List<String>。
 * @param <ResponseType>
 *           须要合并的HystrixCommand的返回类型。
 * @param <RequestArgumentType>
 *           须要合并的HystrixCommand的请求参数类型。
 */
public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {
复制代码

请求合并的过程

  • 请求合并的过程,从例子能够看出,合并后的BatchCommand的参数为Collection<CollapsedRequest<String, Integer>> requests,即请求合并的过程就是从单个请求的参数合并成Collection<CollapsedRequest<ResponseType, RequestArgumentType>>less

  • 所以,能够从getRequestArgument的调用入手,就找到了HystrixCollapser.toObservableide

// 提交请求,直接返回结果了。。。
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

/** * Submit a request to a batch. If the batch maxSize is hit trigger the batch immediately. * 和清楚了将时间窗口内的请求提交,若是到了设定的合并阈值,触发一次合并请求 * @param arg argument to a {@link RequestCollapser} * @return Observable<ResponseType> * @throws IllegalStateException * if submitting after shutdown */
    public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
        /* * 启动计时器,时间窗口阈值到了,则触发一次合并请求 */
        if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {
            /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
            timerListenerReference.set(timer.addListener(new CollapsedTask()));
        }

        // loop until succeed (compare-and-set spin-loop)
        // 等待-通知模型
        while (true) {
	        // 拿到RequestBatch
            final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
            if (b == null) {
                return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
            }

            final Observable<ResponseType> response;
            // 添加到RequestBatch
            if (arg != null) {
                response = b.offer(arg);
            } else {
                response = b.offer( (RequestArgumentType) NULL_SENTINEL);
            }
            // it will always get an Observable unless we hit the max batch size
            // 添加成功,返回 Observable
            if (response != null) {
                return response;
            } else {
                // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
                // 添加失败,执行 RequestBatch ,并建立新的 RequestBatch
                createNewBatchAndExecutePreviousIfNeeded(b);
            }
        }
    }
复制代码
  • offer方法
public Observable<ResponseType> offer(RequestArgumentType arg) {
 2:     // 执行已经开始,添加失败
 3:     /* short-cut - if the batch is started we reject the offer */
 4:     if (batchStarted.get()) {
 5:         return null;
 6:     }
 7: 
 8:     /* 9: * The 'read' just means non-exclusive even though we are writing. 10: */
11:     if (batchLock.readLock().tryLock()) {
12:         try {
13:             // 执行已经开始,添加失败
14:             /* double-check now that we have the lock - if the batch is started we reject the offer */
15:             if (batchStarted.get()) {
16:                 return null;
17:             }
18: 
19:             // 超过队列最大长度,添加失败
20:             if (argumentMap.size() >= maxBatchSize) {
21:                 return null;
22:             } else {
23:                 // 建立 CollapsedRequestSubject ,并添加到队列
24:                 CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);
25:                 final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);
26:                 /** 27: * If the argument already exists in the batch, then there are 2 options: 28: * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses 29: * be hooked up to that argument 30: * B) If request caching is OFF: return an error to all duplicate argument requests 31: * 32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible 33: * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser) 34: * of trying to figure out which argument of a set of duplicates should get attached to a response. 35: * 36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion. 37: */
38:                 if (existing != null) {
39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();
40:                     if (requestCachingEnabled) {
41:                         return existing.toObservable();
42:                     } else {
43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
44:                     }
45:                 } else {
46:                     return collapsedRequest.toObservable();
47:                 }
48: 
49:             }
50:         } finally {
51:             batchLock.readLock().unlock();
52:         }
53:     } else {
54:         return null;
55:     }
56: }
复制代码
  • 第 38 至 47 行 :返回Observable。当argumentMap已经存在arg对应的Observable时,必须开启缓存 ( HystrixCollapserProperties.requestCachingEnabled = true ) 功能。缘由是,若是在相同的arg,而且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable(),那么相同的arg将有多个Observable执行命令,此时HystrixCollapserBridge.mapResponseToRequests 方法没法将执行(Response)赋值到arg对应的命令请求( CollapsedRequestSubject ) ,见 github.com/Netflix/Hys…oop

  • 回过头看HystrixCollapser#toObservable()方法的代码,这里也有对缓存功能,是否是重复了呢?argumentMap 针对的是RequestBatch级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是 1 : 1 : N 的关系,经过 HystrixCollapser#toObservable() 对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。fetch

  • CollapsedTask负责触发时间窗口内合并请求的处理,其实关键方法就是createNewBatchAndExecutePreviousIfNeeded,而且也调用了executeBatchIfNotAlreadyStartedthis

/** * Executed on each Timer interval execute the current batch if it has requests in it. */
    private class CollapsedTask implements TimerListener {
		        ...
                @Override
                public Void call() throws Exception {
                    try {
                        // we fetch current so that when multiple threads race
                        // we can do compareAndSet with the expected/new to ensure only one happens
                        // 拿到合并请求
                        RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
                        // 1) it can be null if it got shutdown
                        // 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
                        // 处理合并请求
                        if (currentBatch != null && currentBatch.getSize() > 0) {
                            // do execution within context of wrapped Callable
                            createNewBatchAndExecutePreviousIfNeeded(currentBatch);
                        }
                    } catch (Throwable t) {
                        logger.error("Error occurred trying to execute the batch.", t);
                        t.printStackTrace();
                        // ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
                    }
                    return null;
                }

            });
        }
复制代码
  • executeBatchIfNotAlreadyStarted中对请求进行了合并及执行!!!

一、调用HystrixCollapserBridge.shardRequests 方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。 二、循环 N 个【多个命令请求】。 三、调用 HystrixCollapserBridge.createObservableCommand 方法,将多个命令请求合并,建立一个 HystrixCommand 。点击 连接 查看代码。

...
  // shard batches
                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                // for each shard execute its requests 
                for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                        try {
                        // create a new command to handle this batch of requests
                        Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
           commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
           ...
复制代码
  • 最后执行就很简单了,调用重写的mapResponseToRequests的方法将一个HystrixCommand的执行结果,映射回对应的命令请求们。

总结

  • 简单来说,就是讲必定时间窗口内的请求,放入“队列”,有一个定时触发的任务,将“队列”里的请求(“队列”里的请求也有可能被分片,成为“请求队列集合”,即Collection<Collection<>>),经过本身实现的BatchCommand执行,将最后结果再映射为合并前HystrixCommand的结果返回。

PS:又见到了好多concurrentHashMap,CAS,Atomic变量。。。

参考文献

相关文章
相关标签/搜索