建立完链接后,就开始监听并处理请求了,查看MemcachedConnection的run方法,能够看到是个死循环处理io请求,下面分析handleIO方法:node
/** * Handle all IO that flows through the connection. * * This method is called in an endless loop, listens on NIO selectors and * dispatches the underlying read/write calls if needed. */ public void handleIO() throws IOException { //若是链接关闭,则当即中止 if (shutDown) { getLogger().debug("No IO while shut down."); return; } //处理客户端的全部请求,下面会分析 handleInputQueue(); ==》 getLogger().debug("Done dealing with queue."); long delay = 1000; if (!reconnectQueue.isEmpty()) { long now = System.currentTimeMillis(); long then = reconnectQueue.firstKey(); delay = Math.max(then - now, 1); } getLogger().debug("Selecting with delay of %sms", delay); //检查selectors是否正确 assert selectorsMakeSense() : "Selectors don't make sense."; int selected = selector.select(delay); if (shutDown) { return; } else if (selected == 0 && addedQueue.isEmpty()) { //没有请求的时候调用,空方法。 handleWokenUpSelector(); } else if (selector.selectedKeys().isEmpty()) { //注册事件为空时,检测次数超过DOUBLE_CHECK_EMPTY时可能会释放链接,放入重连队列 handleEmptySelects(); } else { getLogger().debug("Selected %d, selected %d keys", selected, selector.selectedKeys().size()); emptySelects = 0; Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey sk = iterator.next(); //处理io请求 handleIO(sk); ==》 iterator.remove(); } } handleOperationalTasks(); }
处理客户端请求,主要的操做应该是把inputQ中的ops转移到writeQ中:数据库
/** * Handle any requests that have been made against the client. */ private void handleInputQueue() { if (!addedQueue.isEmpty()) { getLogger().debug("Handling queue"); Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>(); Collection<MemcachedNode> todo = new HashSet<MemcachedNode>(); MemcachedNode qaNode; while ((qaNode = addedQueue.poll()) != null) { todo.add(qaNode); } for (MemcachedNode node : todo) { boolean readyForIO = false; if (node.isActive()) { //判断当前是否有可用的write operation if (node.getCurrentWriteOp() != null) { readyForIO = true; getLogger().debug("Handling queued write %s", node); } } else { toAdd.add(node); } //get set操做的时候会向inputQueue中放入op将node中 //inputQueue中部分op转移到writeQ中, //op的数量根据writeQ可接受大小而定,若是writeQ不为空, //在下面 node.fixupOps()中则会注册write事件 node.copyInputQueue(); if (readyForIO) { try { if (node.getWbuf().hasRemaining()) { //处理写操做,后面分析 handleWrites(node); } } catch (IOException e) { getLogger().warn("Exception handling write", e); lostConnection(node); } } //若是readQ不为空,则注册read事件 //若是writeQ不为空,则注册write事件 node.fixupOps(); } addedQueue.addAll(toAdd); } } /** * Handle IO for a specific selector. * * Any IOException will cause a reconnect. Note that this code makes sure * that the corresponding node is not only able to connect, but also able to * respond in a correct fashion (if verifyAliveOnConnect is set to true * through a property). This is handled by issuing a dummy * version/noop call and making sure it returns in a correct and timely * fashion. * * @param sk the selector to handle IO against. */ private void handleIO(final SelectionKey sk) { MemcachedNode node = (MemcachedNode) sk.attachment(); try { getLogger().debug("Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment()); //是否能够链接 if (sk.isConnectable() && belongsToCluster(node)) { getLogger().debug("Connection state changed for %s", sk); final SocketChannel channel = node.getChannel(); if (channel.finishConnect()) { //链接成功后,将node添加到addedQueue finishConnect(sk, node); } else { assert !channel.isConnected() : "connected"; } } else { //处理读写事件 handleReadsAndWrites(sk, node); } } catch (ClosedChannelException e) { if (!shutDown) { getLogger().info("Closed channel and not shutting down. Queueing" + " reconnect on %s", node, e); lostConnection(node); } } catch (ConnectException e) { getLogger().info("Reconnecting due to failure to connect to %s", node, e); queueReconnect(node); } catch (OperationException e) { node.setupForAuth(); getLogger().info("Reconnection due to exception handling a memcached " + "operation on %s. This may be due to an authentication failure.", node, e); lostConnection(node); } catch (Exception e) { node.setupForAuth(); getLogger().info("Reconnecting due to exception on %s", node, e); lostConnection(node); } node.fixupOps(); } private void handleReadsAndWrites(final SelectionKey sk, final MemcachedNode node) throws IOException { if (sk.isValid()) { if (sk.isReadable()) { handleReads(node); } if (sk.isWritable()) { handleWrites(node); } } }
首先看 handleWrites服务器
/** * Handle pending writes for the given node. * * @param node the node to handle writes for. * @throws IOException can be raised during writing failures. */ private void handleWrites(final MemcachedNode node) throws IOException { //填充writeBuffer,shouldOptimize为false node.fillWriteBuffer(shouldOptimize); ==> //判断toWrite的值,大于零说明有数据须要write boolean canWriteMore = node.getBytesRemainingToWrite() > 0; while (canWriteMore) { //想memcache server发送消息 int wrote = node.writeSome(); metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote); //试图再次填充writeBuffer node.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0; } } //不断从writeQ中获取op,将op中的cmd buffer转移到wbuf,直到writeQ没有元素或者wbuf填满 public final void fillWriteBuffer(boolean shouldOptimize) { if (toWrite == 0 && readQ.remainingCapacity() > 0) { getWbuf().clear(); //op的初始状态是OperationState.WRITE_QUEUED,便可写状态 //1.op是中止的,则会从writeQ中移除,从writeQ中获取下一个op //2.op是超时的,则会从writeQ中移除,从writeQ中获取下一个op //3.op正常,状态转换成OperationState.WRITING,即开始发送数据状态,并将o加入readQ中,准备读取响应 Operation o=getNextWritableOp(); while(o != null && toWrite < getWbuf().capacity()) { synchronized(o) { assert o.getState() == OperationState.WRITING; //获取write buffer ByteBuffer obuf = o.getBuffer(); assert obuf != null : "Didn't get a write buffer from " + o; int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining()); byte[] b = new byte[bytesToCopy]; obuf.get(b); getWbuf().put(b); getLogger().debug("After copying stuff from %s: %s", o, getWbuf()); //cmd中没有数据则说明所有转移完成 if (!o.getBuffer().hasRemaining()) { //OperationState.WRITING状态转换成OperationState.READING o.writeComplete(); //从writeQ中删除此op transitionWriteItem(); //准备即将发生的操做,将inputQ中的op转移到writeQ中,并删除cancel状态的op preparePending(); if (shouldOptimize) { //优化操做,TODO optimize(); } o=getNextWritableOp(); } toWrite += bytesToCopy; } } getWbuf().flip(); assert toWrite <= getWbuf().capacity() : "toWrite exceeded capacity: " + this; assert toWrite == getWbuf().remaining() : "Expected " + toWrite + " remaining, got " + getWbuf().remaining(); } else { getLogger().debug("Buffer is full, skipping"); } }
下面分析spy memcached如何处理memcache的响应。若是channel是可读,则会触发handleReads方法less
private void handleReads(final MemcachedNode node) throws IOException { //从readQ中获取op Operation currentOp = node.getCurrentReadOp(); if (currentOp instanceof TapAckOperationImpl) { node.removeCurrentReadOp(); return; } ByteBuffer rbuf = node.getRbuf(); final SocketChannel channel = node.getChannel(); //读取请求响应 int read = channel.read(rbuf); metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read); //没有数据可读了 TODO if (read < 0) { currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf); } while (read > 0) { getLogger().debug("Read %d bytes", read); rbuf.flip(); while (rbuf.remaining() > 0) { if (currentOp == null) { throw new IllegalStateException("No read operation."); } long timeOnWire = System.nanoTime() - currentOp.getWriteCompleteTimestamp(); metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, (int)(timeOnWire / 1000)); metrics.markMeter(OVERALL_RESPONSE_METRIC); synchronized(currentOp) { readBufferAndLogMetrics(currentOp, rbuf, node); ==》 } currentOp = node.getCurrentReadOp(); } rbuf.clear(); read = channel.read(rbuf); node.completedRead(); } } /** * Read from the buffer and add metrics information. * * @param currentOp the current operation to read. * @param rbuf the read buffer to read from. * @param node the node to read from. * @throws IOException if reading was not successful. */ private void readBufferAndLogMetrics(final Operation currentOp, final ByteBuffer rbuf, final MemcachedNode node) throws IOException { //读取数据并进行协议解析 currentOp.readFromBuffer(rbuf); //读取成功 if (currentOp.getState() == OperationState.COMPLETE) { getLogger().debug("Completed read op: %s and giving the next %d " + "bytes", currentOp, rbuf.remaining()); //移除当前的readOps Operation op = node.removeCurrentReadOp(); assert op == currentOp : "Expected to pop " + currentOp + " got " + op; if (op.hasErrored()) { metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC); } else { metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC); } } else if (currentOp.getState() == OperationState.RETRY) { handleRetryInformation(currentOp.getErrorMsg()); getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: " + "%s ", currentOp); ((VBucketAware) currentOp).addNotMyVbucketNode( currentOp.getHandlingNode()); Operation op = node.removeCurrentReadOp(); assert op == currentOp : "Expected to pop " + currentOp + " got " + op; retryOps.add(currentOp); metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC); } } # 下面分析下memcache的get协议: get <键>*/r/n <键>* - key key是一个不为空的字符串组合,发送这个指令之后,等待服务器的返回。 若是服务器端没有任何数据,则是返回: END/r/n 证实没有不存在这个key,没有任何数据,若是存在数据,则 返回指定格式: VALUE <键> <标记> <数据长度>/r/n <数据块>/r/n END/r/n 返回的数据是以VALUE开始的,后面跟着key和flags,以及数据长度,第二行跟着数据块。 <键> -key 是发送过来指令的key内容 <标记> - flags 是调用set指令保存数据时候的flags标记 <数据长度> - bytes 是保存数据时候定位的长度 <数据块> - data block 数据长度下一行就是提取的数据块内容 END 结束 public void readFromBuffer(ByteBuffer data) throws IOException { // Loop while there's data remaining to get it all drained. while (getState() != OperationState.COMPLETE && data.remaining() > 0) { if (readType == OperationReadType.DATA) { //读取数据块数据,并返回客户端数据 handleRead(data); } else { //提取命令,直到/r/n结束,获取命令行line int offset = -1; for (int i = 0; data.remaining() > 0; i++) { byte b = data.get(); if (b == '\r') { foundCr = true; } else if (b == '\n') { assert foundCr : "got a \\n without a \\r"; offset = i; foundCr = false; break; } else { assert !foundCr : "got a \\r without a \\n"; byteBuffer.write(b); } } if (offset >= 0) { String line = new String(byteBuffer.toByteArray(), CHARSET); byteBuffer.reset(); OperationErrorType eType = classifyError(line); if (eType != null) { errorMsg = line.getBytes(); handleError(eType, line); } else { //处理命令行,咱们分析下BaseGetOpImpl handleLine(line); ==》 } } } } } //BaseGetOpImpl public final void handleLine(String line) { if (line.equals("END")) { //get响应结束 getLogger().debug("Get complete!"); if (hasValue) { //有返回值,即数据块有值 getCallback().receivedStatus(END); } else { //没有获取到数据,key不存在等 getCallback().receivedStatus(NOT_FOUND); } //处理完成 transitionState(OperationState.COMPLETE); data = null; } else if (line.startsWith("VALUE ")) { //读取命令行VALUE。。。 getLogger().debug("Got line %s", line); String[] stuff = line.split(" "); assert stuff[0].equals("VALUE"); currentKey = stuff[1]; currentFlags = Integer.parseInt(stuff[2]); data = new byte[Integer.parseInt(stuff[3])]; if (stuff.length > 4) { casValue = Long.parseLong(stuff[4]); } readOffset = 0; //设置有数据库标识 hasValue = true; getLogger().debug("Set read type to data"); setReadType(OperationReadType.DATA); } else if (line.equals("LOCK_ERROR")) { getCallback().receivedStatus(LOCK_ERROR); transitionState(OperationState.COMPLETE); } else { assert false : "Unknown line type: " + line; } }