首先看下spy memcache的使用demonode
List<InetSocketAddress> serverAddress = new LinkedList<>(); serverAddress.add(new InetSocketAddress("127.0.0.1", 11211)); //构造者 ConnectionFactoryBuilder connectionFactoryBuilder = new ConnectionFactoryBuilder(); connectionFactoryBuilder.setOpTimeout(50); connectionFactoryBuilder.setTimeoutExceptionThreshold(DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD); connectionFactoryBuilder.setReadBufferSize(65535); MemcachedClient client = new MemcachedClient(connectionFactoryBuilder.build(), serverAddress); System.out.println(client.set("zhurui", 2, "zhurui").get());
首先设置ip、port,client能够链接多个memcache node。而后建立ConnectionFactoryBuilder,设置一些基础属性,而后buid,返回一个DefaultConnectionFactory,并实现了抽象方法。而后new MemcachedClient。异步
接下来看下MemcachedClient的构造方法:socket
public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs) throws IOException { if (cf == null) { throw new NullPointerException("Connection factory required"); } if (addrs == null) { throw new NullPointerException("Server list required"); } if (addrs.isEmpty()) { throw new IllegalArgumentException("You must have at least one server to" + " connect to"); } if (cf.getOperationTimeout() <= 0) { throw new IllegalArgumentException("Operation timeout must be positive."); } connFactory = cf; //初始化异步转换编码器,内部建立一个线程池 tcService = new TranscodeService(cf.isDaemon()); //返回默认的object-》byte array序列化qi器SerializingTranscoder transcoder = cf.getDefaultTranscoder(); //返回AsciiOperationFactory opFact = cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; //建立MemcachedConnection,核心,待会分析 mconn = cf.createConnection(addrs); assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); executorService = cf.getListenerExecutorService(); if (authDescriptor != null) { addObserver(this); } }
下面看cf.createConnection(addrs)ide
public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException { return new MemcachedConnection(getReadBufSize(), this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); }
getFailureMode( )返回DEFAULT_FAILURE_MODE,即FailureMode.Redistribute,策略是当前节点处理失败能够转移其余的节点处理memcached
getOperationFactory( )返回AsciiOperationFactory对象优化
下面分析怎么构造MemcachedConnection对象的:ui
/** * Construct a {@link MemcachedConnection}. * * @param bufSize the size of the buffer used for reading from the server. * @param f the factory that will provide an operation queue. * @param a the addresses of the servers to connect to. * @param obs the initial observers to add. * @param fm the failure mode to use. * @param opfactory the operation factory. * @throws IOException if a connection attempt fails early */ public MemcachedConnection(final int bufSize, final ConnectionFactory f, final List<InetSocketAddress> a, final Collection<ConnectionObserver> obs, final FailureMode fm, final OperationFactory opfactory) throws IOException { connObservers.addAll(obs); //重连队列,key是过多久能够尝试重连的时间 reconnectQueue = new TreeMap<Long, MemcachedNode>(); //用来记录排队到节点的操做 addedQueue = new ConcurrentLinkedQueue<MemcachedNode>(); //请求失败策略 failureMode = fm; //是否须要优化多个连续的get操做,默认false,不作优化处理 shouldOptimize = f.shouldOptimize(); //重连最大等待时间,这里是30s maxDelay = TimeUnit.SECONDS.toMillis(f.getMaxReconnectDelay()); //clone,create operation工厂,这里是AsciiOperationFactory opFact = opfactory; //最大的链接超时致使异常的次数 996 timeoutExceptionThreshold = f.getTimeoutExceptionThreshold(); //选择器 selector = Selector.open(); //存放须要被重试的操做 retryOps = Collections.synchronizedList(new ArrayList<Operation>()); //存放须要被定时关闭的节点 nodesToShutdown = new ConcurrentLinkedQueue<MemcachedNode>(); //回调用的链接池 listenerExecutorService = f.getListenerExecutorService(); //从服务端读取数据的buffer size ,65535 this.bufSize = bufSize; //建立 MemcachedNode的工厂 this.connectionFactory = f; String verifyAlive = System.getProperty("net.spy.verifyAliveOnConnect"); if(verifyAlive != null && verifyAlive.equals("true")) { verifyAliveOnConnect = true; } else { verifyAliveOnConnect = false; } wakeupDelay = Integer.parseInt( System.getProperty("net.spy.wakeupDelay", Integer.toString(DEFAULT_WAKEUP_DELAY))); //根据提供的地址建立memcache链接 List<MemcachedNode> connections = createConnections(a); //建立locator,这边使用Native hash (String.hashCode())取模的方法进行多节点负载 locator = f.createLocator(connections); metrics = f.getMetricCollector(); metricType = f.enableMetrics(); registerMetrics(); setName("Memcached IO over " + this); setDaemon(f.isDaemon()); //启动MemcachedConnection的run方法 start(); }
接下来看下怎么建立MemcachedConnection的this
protected List<MemcachedNode> createConnections( final Collection<InetSocketAddress> addrs) throws IOException { List<MemcachedNode> connections = new ArrayList<MemcachedNode>(addrs.size()); for (SocketAddress sa : addrs) { //打开一个SocketChannel SocketChannel ch = SocketChannel.open(); //设置成非阻塞 ch.configureBlocking(false); //建立一个memcachedNode,实现类是AsciiMemcachedNodeImpl, //内部初始化了读操做、写操做、链接操做队列,最大阻塞操等待做完成时间(10s),操做(50ms)、验证超时设置 MemcachedNode qa = connectionFactory.createMemcachedNode(sa, ch, bufSize); qa.setConnection(this); int ops = 0; //设置NoDelay为true ch.socket().setTcpNoDelay(!connectionFactory.useNagleAlgorithm()); try { //链接服务端,若是当即链接成功则返回true if (ch.connect(sa)) { getLogger().info("Connected to %s immediately", qa); connected(qa); } else { getLogger().info("Added %s to connect queue", qa); ops = SelectionKey.OP_CONNECT; } //唤醒select()方法 selector.wakeup(); //注册CONNECT事件并把selection保存到node qa.setSk(ch.register(selector, ops, qa)); assert ch.isConnected() || qa.getSk().interestOps() == SelectionKey.OP_CONNECT : "Not connected, and not wanting to connect"; } catch (SocketException e) { getLogger().warn("Socket error on initial connect", e); queueReconnect(qa); } connections.add(qa); } return connections; }