下面分析get请求逻辑node
//根据key获取相应的值,并使用解码器进行解码 public Object get(String key) { return get(key, transcoder); } //异步获取future,并经过get方法设置超时时间,获取结果 public <T> T get(String key, Transcoder<T> tc) { try { return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { if(e.getCause() instanceof CancellationException) { throw (CancellationException) e.getCause(); } else { throw new RuntimeException("Exception waiting for value", e); } } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value: " + buildTimeoutMessage(operationTimeout, TimeUnit.MILLISECONDS), e); } } public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) { final CountDownLatch latch = new CountDownLatch(1); final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key, executorService); //读取到END/n/r的时候前后调用receivedStatus,complete //获取到数据后调用gotData //执行顺序是gotData,receivedStatus,complete Operation op = opFact.get(key, new GetOperation.Callback() { private Future<T> val; @Override public void receivedStatus(OperationStatus status) { rv.set(val, status); } @Override public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())); } @Override public void complete() { latch.countDown(); rv.signalComplete(); } }); rv.setOperation(op); mconn.enqueueOperation(key, op); ==》 return rv; } protected void addOperation(final String key, final Operation o) { MemcachedNode placeIn = null; //根据key路由,默认负载算法是根据key hash取模 MemcachedNode primary = locator.getPrimary(key); //节点可用或者失败处理模式为重试机制 if (primary.isActive() || failureMode == FailureMode.Retry) { placeIn = primary; } else if (failureMode == FailureMode.Cancel) { //节点不可用且重试模式为当即中止 o.cancel(); } else { //若是primary不可用,且FailureMode为Redistribute则选择一个能够节点处理 Iterator<MemcachedNode> i = locator.getSequence(key); while (placeIn == null && i.hasNext()) { MemcachedNode n = i.next(); if (n.isActive()) { placeIn = n; } } if (placeIn == null) { placeIn = primary; this.getLogger().warn("Could not redistribute to another node, " + "retrying primary node for %s.", key); } } assert o.isCancelled() || placeIn != null : "No node found for key " + key; if (placeIn != null) { //将op添加到该节点的inputQ中,同时也把op放入到addedQueue中 addOperation(placeIn, o); } else { assert o.isCancelled() : "No node found for " + key + " (and not " + "immediately cancelled)"; } }
调用完asyncGet后获取到GetFuture,而后再调用GetFuture的get方法:redis
能够看出首先根据rc获取到Furure对象,而后经过Future get方法获取最终结果算法
public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { Future<T> v = rv.get(duration, units); return v == null ? null : v.get(); }
再看rv.get(duration, units)方法:异步
public T get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { //阻塞,等待GetOperation.Callback()调用complete方法 if (!latch.await(duration, units)) { // whenever timeout occurs, continuous timeout counter will increase by 1. //记录某个节点操做超时次数+1 MemcachedConnection.opTimedOut(op); if (op != null) { // op can be null on a flush //标识超时 op.timeOut(); } throw new CheckedOperationTimeoutException( "Timed out waiting for operation", op); } else { // continuous timeout counter will be reset //重置某个节点操做超时次数为0 MemcachedConnection.opSucceeded(op); } if (op != null && op.hasErrored()) { throw new ExecutionException(op.getException()); } if (isCancelled()) { throw new ExecutionException(new CancellationException("Cancelled")); } if (op != null && op.isTimedOut()) { throw new ExecutionException(new CheckedOperationTimeoutException( "Operation timed out.", op)); } /* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */ //获取GetOperation中Callbac中gotData方法中 //tcService.decode(tc, new CachedData(flags, data,tc.getMaxSize()))生成的Future return objRef.get(); }