spymemcached源码深刻分析

spymemcached深刻分析 html

author:智深 前端

version0.7 java

日志:http://my.oschina.net/astute node

QQ2548921609(技术交流) git


1、简介

spymemcached 是一个 memcache 的客户端, 使用 NIO 实现 github

分析 spymemcached 须要了解 NIOmemcached使用,memcached协议,参考资料中列出了有用的资源链接。 算法

NIONew I/O的缩写,Java里边你们通常称为异步IO,实际上对应Linux系统编程中的事件驱动IOevent-driven IO),是对 epoll 的封装。其它的IO模型还包括同步,阻塞,非阻塞,多路复用(selectpoll)。阻塞/非阻塞是 fd 的属性,同步会跟阻塞配合,这样的应用会一直 sleep,直到IO完成被内核唤醒;同步非阻塞的话,第一次读取时,若是没有数据,应用线程会马上返回,可是应用须要肯定以什么样的策略进行后面的系统调用,若是是简单的while循环会致使CPU 100%,复杂的相似自旋的策略增长了应用编程的难度,所以同步非阻塞不多使用。多路复用是Linux早期的一个进程监控多个fd的方式,性能比较低,每次调用涉及3次循环遍历,具体分析见 http://my.oschina.net/astute/blog/92433 event-driven IO,应用注册 感兴趣的socket IO事件(READ,WRITE),调用wait开始sleep,当条件成立时,如数据到达(可读),写缓冲区可用(可写),内核唤醒应用线程,应用线程根据获得的socket执行同步的调用读/写 数据。 编程


2、协议简介

memcachded服务器和客户端之间采用 TCP 的方式通讯,自定义了一套字节流的格式。本文分析的文本协议的构建和其它文本协议相似,mc里面分红命令行和数据块行,命令行里指明数据块的字节数目,命令行和数据块后都跟随\r\n。重要的一点是服务器在读取数据块时是根据命令行里指定的字节数目,所以数据块中含有\r\n并不影响服务器读块操做。数据块后必须跟随\r\n 数组

存储命令

发送 tomcat

<command name> <key> <flags> <exptime> <bytes> [noreply]\r\n

cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n

<data block>\r\n

command name = "set", "add", "replace", "append" or "prepend"

flags - 32位整数 server并不操做这个数据 get时返回给客户端

exptime - 过时时间,能够是unix时间戳或偏移量,偏移量的话最大为30*24*60*60, 超过这个值,服务器会认为是unix时间戳

bytes - 数据块字节的个数

响应

<data block>\r\n

STORED\r\n - 成功

NOT_STORED\r\n - addreplace命令没有知足条件

EXISTS\r\n - cas命令 代表item已经被修改

NOT_FOUND\r\n - cas命令 item不存在

获取命令

发送

get <key>*\r\n

gets <key>*\r\n

<key>* - 空格分割的一个或多个字符串

响应

VALUE <key> <flags> <bytes> [<cas unique>]\r\n

<data block>\r\n

VALUE <key> <flags> <bytes> [<cas unique>]\r\n

<data block>\r\n

END\r\n

本文以 get 操做为例;key = someKey  value=abcdABC中文

以字节流的形式最终发送的数据

[103, 101, 116, 32, 115, 111, 109, 101, 75, 101, 121, 13, 10, 0]

103 101 116 - "get"

32 - "" 空格

115 111 109 101 75 101 121 - someKey

13 10 - \r\n

接收到的数据

VALUE someKey 0 13

61 62 63 64 41 42 43 E4 B8 AD E6 96 87\r\n

END\r\n

删除命令

发送

delete <key> [noreply]\r\n

响应

DELETED\r\n - 成功删除

NOT_FOUND\r\n - 删除的条目不存在

其它命令

详见参考资料 mc 协议

3、spymemcached中的重要对象
简介

spymc的客户端,所以spy中全部对象须要基于它要完成的 功能 和 mc服务器的通讯协议来进行设计。最重要的MemcachedClient表示mc集群的client,应用中单例便可。spy中的每个mc节点,用MemcachedNode表示,这个对象内部含有一个channel,网络链接到mc节点。要根据key的哈希值查找某个mc节点,spy中使用NodeLocator,默认locatorArrayModNodeLocator,这个对象内部含有全部的MemcachedNodespy使用的hash算法都在对象DefaultHashAlgorithm中,默认使用NATIVE_HASH,也就是String.hashCode()locatorclient中间还有一个对象,叫MemcachedConnection ,它表示到mc集群的链接,内部持有locatorclent内部持有MemcachedConnection(mconn)spy使用NIO实现,所以有一个selector,这个对象存在于mconn中。要和服务器进行各类操做的通讯,协议数据发送,数据解析,spy中抽象为Operation,文本协议的get操做最终实现为net.spy.memcached.protocol.ascii.GetOperationImpl。为了实现工做线程和IO线程之间的调度,spy抽象出了一个 GetFuture,内部持有一个OperationFuture

TranscodeService执行字节数据和对象之间的转换,spy中实现方式为任务队列+线程池,这个对象的实例在client中。

对象详解

SpyObject - spy中的基类 定义 Logger

MemcachedConnection 表示到多台 mc 节点的链接

MemcachedConnection 详细属性

    shouldOptimize - 是否须要优化多个连续的get操做 --> gets 默认true

    addedQueue - 用来记录排队到节点的操做

    selector - 监控到多个 mc 服务器的读写事件

    locator - 定位某个 mc 服务器

GetFuture 前端线程和工做线程交互的对象

    --> OperationFuture

ConnectionFactory 建立 MemcachedConnection 实例;建立操做队列;建立 OperationFactory;制定 Hash 算法。

DefaultConnectionFactory 默认链接工厂

DefaultHashAlgorithm - Hash算法的实现类

MemcachedNode 定义到 单个memcached 服务器的链接

    TCPMemcachedNodeImpl - 

        AsciiMemcachedNodeImpl - 

        BinaryMemcachedNodeImpl - 

TCPMemcachedNodeImpl 重要属性

    socketAddress - 服务器地址

    rbuf - 读缓冲区 默认大小 16384

    wbuf - 写缓冲区 默认大小 16384

    writeQ - 写队列

    readQ - 读队列

    inputQueue - 输入队列 memcachclient添加操做时先添加到 inputQueue

    opQueueMaxBlockTime - 操做的最大阻塞时间 默认10

    reconnectAttempt - 重连尝试次数 volatile

    channel - socket 通道

    toWrite - 要向socket发送的字节数

    optimizedOp - 优化后的Operation 实现类是OptimizedGetImpl

       sk - channel注册到selector后的key

    shouldAuth - 是否须要认证 默认 false

    authLatch - 认证须要的Latch

    reconnectBlocked - 

    defaultOpTimeout - 操做默认超时时间 默认值 2.5

    continuousTimeout - 连续超时次数

    opFact - 操做工厂

MemcachedClient 重要属性

    mconn - MemcachedConnection 

    opFact - 操做工厂

    transcoder - 解码器

    tcService - 解码线程池服务

    connFactory - 链接工厂

Operation 全部操做的基本接口

    BaseOperationImpl

        OperationImpl

            BaseGetOpImpl - initialize 协议解析 构建缓冲区

                GetOperationImpl

OperationFactory 为协议构建操做 好比生成 GetOperation

    BaseOperationFactory

        AsciiOperationFactory - 文本协议的操做工厂 默认的操做工厂

        BinaryOperationFactory - 二进制协议的操做工厂

OperationFactory 根据 protocol handlers 构建操做

    BaseOperationFactory

        AsciiOperationFactory - 支持 ascii protocol

        BinaryOperationFactory - 支持 binary operations

NodeLocator 根据 key hash 值查找节点

    ArrayModNodeLocator - hash 值和节点列表长度取模,做为下标,简单的数组查询

    KetamaNodeLocator - Ketama一致性hash的实现

Transcoder 对象和字节数组之间的转换接口

    BaseSerializingTranscoder

        SerializingTranscoder - 默认的transcoder

TranscodeService 异步的解码服务,含有一个线程池

FailureMode - node失效的模式

    Redistribute - 节点失效后移动到下一个有效的节点  默认模式

    Retry - 重试失效节点 直至恢复

    Cancel - 取消操做

4、总体流程
初始化

客户端执行new MemcachedClient(new InetSocketAddress("192.168.56.101", 11211))。初始化 MemcachedClient,内部初始化MemcachedConnection,建立selector,注册channelselector,启动IO线程。

线程模型

初始化完成后,把监听mc节点事件的线程,也就是调用select的线程,称为IO线程;应用执行 c.get("someKey"),把应用所在的线程称为工做线程。工做线程一般由tomcat启动,负责建立操做,加入节点的操做队列,工做线程一般有多个;IO线程负责从队列中拿到操做,执行操做。

工做线程

工做线程最终会调用asyncGet,方法内部会建立CountDownLatch(1), GetFutureGetOperationImpl(持有一个内部类,工做线程执行完成后,最终会调用 latch.countDown()),选择mc节点,操做op初始化(生成写缓冲区),把op放入节点等待队列inputQueue中,同时会把当前节点放入mc链接(mconn)addedQueue属性中,最后唤醒selector。最终工做线程在latch上等待(默认超时2.5秒)IO线程的执行结果。

IO线程

IO线程被唤醒后

一、handleInputQueue()。移动OperationinputQueuewriteQ中。对添加到addedQueue中的每个MemcachedNode分别进行处理。这个函数会处理全部节点上的全部操做,所有发送到mc服务器(以前节点上就有写操做的才这么处理,不然只是注册写事件)。

二、循环过程当中,若是当前node中没有写操做,则判断writeQreadQ中有操做,在SK上注册读/写事件;若是有写操做,须要执行handleWrites函数。这个函数内部首先作的是填充缓冲区fillWriteBuffer():从writeQ中取出一个可写的操做(remove掉取消的和超时的),改变操做的状态为WRITING,把操做的数据复制到写缓冲区(写缓冲区默认16K,操做的字节数从十几字节到1M,这个地方有复杂的处理,后面会详细分析,如今只考虑简单状况),复制完成后把操做状态变为READING,从writeQremove当前操做,把操做addreadQ当中,这个地方会再去复制pending的操做;、发送写缓冲区的内容,所有发送完成后,会再次去填充缓冲区fillWriteBuffer()(好比说一个大的命令,一个缓冲区不够)。循环,直到全部的写操做都处理完。ƒ、判断writeQreadQ是否有操做,更新sk注册的读写事件。get操做的话,如今已经注册了读事件。

三、selector.select()

四、数据到达时,执行handleIO(sk),处理读事件;执行channel.read(rbuf);执行readFromBuffer(),解析数据,读取到END\r\n将操做状态置为COMPLETE

5、初始化详细流程

一、默认链接工厂为 DefaultConnectionFactory。接着建立TranscodeService(解码的线程池,默认线程最多为10),建立AsciiOperationFactory(支持ascii协议的操做工厂,负责生成各类操做,好比 GetOperationImpl),建立MemcachedConnection,设置操做超时时间(默认2.5秒)。

二、DefaultConnectionFactory建立MemcachedConnection详细过程:建立reconnectQueueaddedQueue,设置shouldOptimizetrue,设置maxDelay30秒,设置opFact,设置timeoutExceptionThreshold1000(超过这个值,关闭到 mc node 的链接),打开 Selector,建立nodesToShutdown,设置bufSize16384字节,建立到每一个node的 MemcachedNode(默认是AsciiMemcachedNodeImpl,这一步建立SocketChannel,链接到mc节点,注册到selector,设置sk为刚注册获得的SelectionKey),最后启动 MemcachedConnection 线程,进入事件处理的循环代码 

while(running) handleIO()

6、核心流程代码
一、工做线程

一切从工做线程调用 c.get("someKey") 方法开始
基本流程是:建立操做(Operation),操做初始化,查找节点,把操做加入节点的等待队列,唤醒IO线程,而后工做线程在Future上等待IO线程的执行结果

// 默认等待2.5秒
return asyncGet(key, tc).get(2500, TimeUnit.MILLISECONDS)
// 内部类GetOperation.Callback,是工做线程和IO线程交互的类
// IO线程获得全部的操做响应数据后,调用gotData方法
// IO线程接收到END\r\n后,调用receivedStatus和complete方法
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
	final CountDownLatch latch = new CountDownLatch(1);
	final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key);
	Operation op = opFact.get(key, new GetOperation.Callback() {
		private Future<T> val = null;
		public void receivedStatus(OperationStatus status) {
			rv.set(val, status);
		}
		public void gotData(String k, int flags, byte[] data) {
			val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
		}
		public void complete() {
			latch.countDown();
		}
	});
	rv.setOperation(op);
	mconn.enqueueOperation(key, op);
	return rv;	// 最终会在rv上调用get方法
}
// 向节点中队列中添加操做 一、查找节点 二、放入队列
// 查找节点,根据key的hash值,对节点数取模
protected void addOperation(final String key, final Operation o) {
	MemcachedNode placeIn = null;
	MemcachedNode primary = locator.getPrimary(key);
	if (primary.isActive() || failureMode == FailureMode.Retry) {
		placeIn = primary;
	} else if (failureMode == FailureMode.Cancel) {
		o.cancel();
	} else {
		for (Iterator<MemcachedNode> i = locator.getSequence(key); placeIn == null
		&& i.hasNext();) {
			MemcachedNode n = i.next();
			if (n.isActive()) {
				placeIn = n;
			}
		}
		if (placeIn == null) {
			placeIn = primary;
		}
	}
	if (placeIn != null) {
		addOperation(placeIn, o);
	} else {
	}
}
// 最重要的方法 
protected void addOperation(final MemcachedNode node, final Operation o) {
	o.setHandlingNode(node);
	o.initialize();  // 操做初始化,生成要发送的字节流数据,放到缓冲区中
	node.addOp(o);   // 添加到节点的inputQueue中
	addedQueue.offer(node);   // 有操做的节点放入 addedQueue中
	Selector s = selector.wakeup(); // 唤醒IO线程
}
工做线程和IO线程之间传递的Future对象,结构以下
GetFuture ---> OperationFuture ---> latch
---> 表示依赖关系


// 最终工做线程在 OperationFuture的get方法上等待latch
public T get(long duration, TimeUnit units) {
	if (!latch.await(duration, units)) { // 等待2.5秒
		MemcachedConnection.opTimedOut(op);
		if (op != null) {
			op.timeOut(); //2.5秒后,操做没有执行完,设置超时(IO线程会判断,若是超时,就remove)
		}
		// throw exception
	}
	return objRef.get(); // objRef是一个原子引用,来保证对象的安全发布(线程安全)
}
// objRef引用的是一个TranscodeService.Task(自己是个FutureTask)对象,若是没有压缩和序列化的话,最终工做线程会调用tc.decode方法,获得返回值。


二、IO线程

IO线程的操做循环
处理输入队列,注册写事件;执行写操做,注册读事件;处理读操做,解析结果。

public void run() {
  while (running) {
  handleIO();
  }
}
public void handleIO() throws IOException {
	handleInputQueue();
	int selected = selector.select(delay);
	Set<SelectionKey> selectedKeys = selector.selectedKeys();

	if (selectedKeys.isEmpty() && !shutDown) {
		// some code
	} else {
		for (SelectionKey sk : selectedKeys) {
			handleIO(sk);
		}
		selectedKeys.clear();
	}
}
handleInputQueue
处理addedQueue中的全部节点,对每个节点复制inputQueue中的操做到writeQ中。注册读写事件。
private void handleInputQueue() {
	if (!addedQueue.isEmpty()) {
		Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
		Collection<MemcachedNode> todo = new HashSet<MemcachedNode>();
		MemcachedNode qaNode = null;
		while ((qaNode = addedQueue.poll()) != null) {
			todo.add(qaNode);
		}
		for (MemcachedNode qa : todo) {
			boolean readyForIO = false;
			if (qa.isActive()) {
				if (qa.getCurrentWriteOp() != null) {
					readyForIO = true;
				}
			} else {
				toAdd.add(qa);
			}
			qa.copyInputQueue();
			if (readyForIO) {
				try {
					if (qa.getWbuf().hasRemaining()) {
						handleWrites(qa.getSk(), qa);
					}
				} catch (IOException e) {
					lostConnection(qa);
				}
			}
			qa.fixupOps();
		}
		addedQueue.addAll(toAdd);
	}
}
spy中注册读写事件的函数
readQ不为空注册读事件;writeQ不为空注册写事件;网络没有链接上注册链接事件。
public final void fixupOps() {
	SelectionKey s = sk;
	if (s != null && s.isValid()) {
		int iops = getSelectionOps();
		s.interestOps(iops);
	} else {
	}
}
public final int getSelectionOps() {
	int rv = 0;
	if (getChannel().isConnected()) {
		if (hasReadOp()) {
			rv |= SelectionKey.OP_READ;
		}
		if (toWrite > 0 || hasWriteOp()) {
			rv |= SelectionKey.OP_WRITE;
		}
	} else {
		rv = SelectionKey.OP_CONNECT;
	}
	return rv;
}
public final boolean hasReadOp() {
	return !readQ.isEmpty();
}

public final boolean hasWriteOp() {
	return !(optimizedOp == null && writeQ.isEmpty());
}

三、handleWrites(SelectionKey sk, MemcachedNode qa)
我可以想到的一些场景,这个状态机代码必须处理的
⑴ 当前队列中有1个操做,操做要发送的字节数目小于16K
⑵ 当前队列中有1个操做,操做要发送的字节数目大于16K(很大的set操做)
⑶ 当前队列中有多个操做,操做要发送的字节数目小于16K
⑷ 当前队列中有多个操做,操做要发送的字节数目大于16K
⑸ 任意一次写操做wrote为0

summary:处理节点中writeQ和inputQueue中的全部操做。每次循环会尽可能填满发送缓冲区,而后将发送缓冲区的内容所有发送到网络上,循环往复,没有异常的状况下,直至发送完数据。操做中发送的内容只要放入到发送缓冲区后,就把操做加入到readQ(spy中根据writeQ和readQ中有没有数据,来注册读写事件)。

执行时机:IO线程在select上休眠,被工做线程唤醒后,处理输入队列,把操做复制到writeQ 中,注册写事件;再次调用select,返回后,就会调用handleWrites(),数据所有发送后,会注册读事件。处理输入队列时,若是wbuf还有东西没有发送,那么会在select调用前,调用handleWrites函数。

private void handleWrites(SelectionKey sk, MemcachedNode qa) throws IOException {
	qa.fillWriteBuffer(shouldOptimize); --->
	boolean canWriteMore = qa.getBytesRemainingToWrite() > 0;
	while (canWriteMore) {
		int wrote = qa.writeSome(); --->
		qa.fillWriteBuffer(shouldOptimize);
		canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
	}
}

 -- 发送数据;执行一次后,wbuf可能还有数据未写完
public final int writeSome() throws IOException {
	int wrote = channel.write(wbuf);
	toWrite -= wrote;
	return wrote;
}
-- 填充缓冲区
toWrite=0 代表 写缓冲区之前的内容已经所有写入到网络中,这样才会进行下一次的填充写缓冲区
操做会尽可能填满16K的缓冲区(单一操做数据量很大好比500K;或多个操做数据量500K)
当一个操做中的数据彻底写入缓冲区后,操做的状态变成READING,从writeQ中移除当前操做。
public final void fillWriteBuffer(boolean shouldOptimize) {
	if (toWrite == 0 && readQ.remainingCapacity() > 0) {
		getWbuf().clear();
		Operation o=getNextWritableOp(); --->

		while(o != null && toWrite < getWbuf().capacity()) {
			synchronized(o) {
				ByteBuffer obuf = o.getBuffer();
				int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());
				byte[] b = new byte[bytesToCopy];
				obuf.get(b);
				getWbuf().put(b);
				if (!o.getBuffer().hasRemaining()) {
					o.writeComplete();
					transitionWriteItem();
					preparePending(); -- copyInputQueue()
					if (shouldOptimize) {
						optimize();
					}
					o=getNextWritableOp();
				}
				toWrite += bytesToCopy;
			}
		}
		getWbuf().flip();
	} else {
	}
}
-- 获取节点写队列中下一个可写的操做
若是操做已经取消(前端线程等待超时,取消操做),或超时(IO线程没有来得及执行操做,操做超时),那么把操做从队列中移除,继续查找下一个操做。把可写的操做的状态从WRITE_QUEUED变成WRITING,同时把操做放入读队列中。
private Operation getNextWritableOp() {
	Operation o = getCurrentWriteOp(); --->④
	while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
		synchronized(o) {
			if (o.isCancelled()) {
				Operation cancelledOp = removeCurrentWriteOp();--->⑤
			} else if (o.isTimedOut(defaultOpTimeout)) {
				Operation timedOutOp = removeCurrentWriteOp();
			} else {
				o.writing();
				if (!(o instanceof TapAckOperationImpl)) {
					readQ.add(o);
				}
				return o;
			}
			o = getCurrentWriteOp();
		}
	}
	return o;
}
④ -- 拿到当前写操做(并不从队列中移除)
public final Operation getCurrentWriteOp() {
	return optimizedOp == null ? writeQ.peek() : optimizedOp;
}

⑤ -- remove当前写操做
public final Operation removeCurrentWriteOp() {
	Operation rv = optimizedOp;
	if (rv == null) {
		rv = writeQ.remove();
	} else {
		optimizedOp = null;
	}
	return rv;
}
四、handleReads
handleReads(SelectionKey sk, MemcachedNode qa)
从网络中读取数据,放入rbuf。解析rbuf,获得结果;
private void handleReads(SelectionKey sk, MemcachedNode qa) throws IOException {
	Operation currentOp = qa.getCurrentReadOp();
	if (currentOp instanceof TapAckOperationImpl) { // no response
		qa.removeCurrentReadOp();
		return;
	}
	ByteBuffer rbuf = qa.getRbuf();
	final SocketChannel channel = qa.getChannel();
	int read = channel.read(rbuf);
	if (read < 0) {
		// some code
	}
	while (read > 0) { // 从网络中读数据一直读到0为止
		rbuf.flip();
		while (rbuf.remaining() > 0) { // 只要缓冲区有数据 就去解析操做
			synchronized(currentOp) {
				currentOp.readFromBuffer(rbuf); // 从rbuf中解析响应
				if (currentOp.getState() == OperationState.COMPLETE) {
					Operation op = qa.removeCurrentReadOp(); // 操做解析成功,移除
				} else if (currentOp.getState() == OperationState.RETRY) {
					((VBucketAware) currentOp).addNotMyVbucketNode(currentOp.getHandlingNode());
					Operation op = qa.removeCurrentReadOp();
					retryOps.add(currentOp);
				}
			}
			currentOp=qa.getCurrentReadOp();
		}
		rbuf.clear();
		read = channel.read(rbuf);
	}
}
// 解析rbuf;readType有两种取值,LINE 和 DATA,用来区分正在操做的数据是命令行仍是数据块。解析过程当中,分别调用工做线程传入到操做中的回调对象的三个方法,分别是:receivedStatus,gotData,complete。
public void readFromBuffer(ByteBuffer data) throws IOException {
	while (getState() != OperationState.COMPLETE && data.remaining() > 0) {
		if (readType == OperationReadType.DATA) {
			handleRead(data);
		} else {
			int offset = -1;
			for (int i = 0; data.remaining() > 0; i++) {
				byte b = data.get();
				if (b == '\r') {
					foundCr = true;
				} else if (b == '\n') {
					assert foundCr : "got a \\n without a \\r";
					offset = i;
					foundCr = false;
					break;
				} else {
					assert !foundCr : "got a \\r without a \\n";
					byteBuffer.write(b);
				}
			}
			if (offset >= 0) {
				String line = new String(byteBuffer.toByteArray(), CHARSET);
				byteBuffer.reset();
				OperationErrorType eType = classifyError(line);
				if (eType != null) {
					handleError(eType, line);
				} else {
					handleLine(line); // 取到完整的一行后 调用这个函数 解析行数据
				}
			}
		}
	}
}
// 处理命令行和END行
// 解析命令行时,分析各类参数 data为数据块的字节数
public final void handleLine(String line) {
	if (line.equals("END")) {
		if (hasValue) {
			getCallback().receivedStatus(END);
		} else {
			getCallback().receivedStatus(NOT_FOUND);
		}
		transitionState(OperationState.COMPLETE);
		data = null;
	} else if (line.startsWith("VALUE ")) {
		String[] stuff = line.split(" ");
		currentKey = stuff[1];
		currentFlags = Integer.parseInt(stuff[2]);
		data = new byte[Integer.parseInt(stuff[3])];
		if (stuff.length > 4) {
			casValue = Long.parseLong(stuff[4]);
		}
		readOffset = 0;
		hasValue = true;
		setReadType(OperationReadType.DATA);
	} else if (line.equals("LOCK_ERROR")) {
		getCallback().receivedStatus(LOCK_ERROR);
		transitionState(OperationState.COMPLETE);
	} else {
		assert false : "Unknown line type: " + line;
	}
}

五、那个著名的bug

JAVA NIO bug 会致使 CPU 100%

http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 

int selected = selector.select(delay);

    Set<SelectionKey> selectedKeys = selector.selectedKeys();

    if (selectedKeys.isEmpty() && !shutDown) {

      if (++emptySelects > DOUBLE_CHECK_EMPTY) {

        for (SelectionKey sk : selector.keys()) {

          if (sk.readyOps() != 0) {

            handleIO(sk);

          } else {

            lostConnection((MemcachedNode) sk.attachment());

          }

DOUBLE_CHECK_EMPTY = 256,当连续的select返回为空时,++emptySelects,超过256,链接到当前mc节点的socket channel关闭,放入重连队列。

7、调试 spymemcached

调试 spymemcached IO线程的过程当中,工做线程放入到节点队列的操做很容易超时,所以须要继承DefaultConnectionFactory 复写相关方法。

public class AstuteConnectionFactory extends DefaultConnectionFactory {
@Override
public boolean shouldOptimize() {
return false;
}
@Override
public long getOperationTimeout() {
return 3000000L; // 3000S
}
}

8、参考资料

NIOhttp://www.ibm.com/developerworks/cn/education/java/j-nio/index.html 

memcachedhttp://memcached.org/ 

protocolhttps://github.com/memcached/memcached/blob/master/doc/protocol.txt

相关文章
相关标签/搜索