Jedis源码分析

对于平常开发,Redis因为单线程的并发模型、丰富的数据结构和简单的API,深受广大程序员的喜好。Redis提供了多种语言的API,像java、c和python等。以前一直都是使用redis,可是没有多redis的API有一个系统的认识。忙里偷闲,撸一下Redis相关的API的实现,如今学习了一下jedis的源码,来分析一下Redis的读写流程。java

1、Jedis项目结构

python

 
 

代码是比较简单的,并且不少类也没有那么多的抽象和继承,实际上是比较好懂的。commands包里面主要是封装的redis支持的各类命令。
exception包主要是封装了一些redis的exception。
在jedis包下的是一些redis的Client。
jedis的代码结构大体就是上述这些,这里咱们就以最简单的jedis类来看一下读写的流程。程序员

2、Jedis继承结构

这里是jedis的UML图:redis


 
 
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands { 

Jedis 继承了BinaryJedis 同时实现了一系列的Commands接口,BinaryJedis里主要和redis Server进行交互,一系列Commands接口主要是对redis支持的接口进行分类,像BasicCommands主要包含了info、flush等操做,BinaryJedisCommands 主要包含了get、set等操做,MultiKeyBinaryCommands主要包含了一些批量操做的接口例如mset等。安全

3、用set方法分析Redis的请求流程

因为Jedis实现了各类接口,致使它内部的方法十分的多,这里咱们使用一个简单的Demo来学习一下Jedis:数据结构

Jedis jed = new Jedis("locahost",6379); jed.set("hello","123"); String out = jed.get("hello"); 

首先看Jedis的实例化过程:多线程

public Jedis(final String host, final int port) { super(host, port);} public BinaryJedis(final String host, final int port) { client = new Client(host, port);} 

Jedis由于继承了BinaryJedis,大部分的操做都是在BinaryJedis中实现的,在BinaryJedis的构造方法中就实例化了Client。
Client的继承结构以下:并发

 
 

BinaryJedis中的方法主要是对Client作了代理,Client继承了BinaryClient,BinaryClient继承了Connection,实现了Commands接口。Client主要作了一些编解码的工做,BinaryClient作了Command的发送操做,而全部与redisServer交互的工做由Connection完成。app

首先看Set方法:socket

/** * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * <p> * Time complexity: O(1) * @param key * @param value * @return Status code reply */ @Override public String set(final String key, String value) { checkIsInMultiOrPipeline(); client.set(key, value); return client.getStatusCodeReply(); } 

这里主要委托给Client进行处理。

@Override public void set(final String key, final String value) { set(SafeEncoder.encode(key), SafeEncoder.encode(value)); } 

这里主要是调用了BinaryClient的set方法。

public void set(final byte[] key, final byte[] value) { sendCommand(Command.SET, key, value); } 

这里主要是委托了Connection的sendCommand方法。接下来到了关键部分:

public Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); Protocol.sendCommand(outputStream, cmd, args); return this; } catch (JedisConnectionException ex) { /* * When client send request which formed by invalid protocol, Redis send back error message * before close connection. We try to read it to provide reason of failure. */ try { String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } } catch (Exception e) { /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */ } // Any other exceptions related to connection? broken = true; throw ex; } } 
  1. 调用connect()方法进行链接:
public void connect() { if (!isConnected()) { try { socket = new Socket(); // ->@wjw_add socket.setReuseAddress(true); socket.setKeepAlive(true); // Will monitor the TCP connection is // valid socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to // ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, // the underlying socket is closed // immediately // <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout); socket.setSoTimeout(soTimeout); if (ssl) { if (null == sslSocketFactory) { sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault(); } socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true); if (null != sslParameters) { ((SSLSocket) socket).setSSLParameters(sslParameters); } if ((null != hostnameVerifier) && (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) { String message = String.format( "The connection to '%s' failed ssl/tls hostname verification.", host); throw new JedisConnectionException(message); } } outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { broken = true; throw new JedisConnectionException("Failed connecting to host " + host + ":" + port, ex); } } } 

这里主要使用Socket通讯来实现命令的发送,链接使用长链接来减少创建链接的开销。并实例化了RedisOutputStream和RedisInputStream。在每一次进行query的时候都会调用connect方法来保证以前链接失效以后能新建链接并操做成功。

  1. 调用Protocol的sendCommand方法进行发送:
public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command, final byte[]... args) { sendCommand(os, command.getRaw(), args); } private static void sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args) { try { os.write(ASTERISK_BYTE); os.writeIntCrLf(args.length + 1); os.write(DOLLAR_BYTE); os.writeIntCrLf(command.length); os.write(command); os.writeCrLf(); for (final byte[] arg : args) { os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf(); } } catch (IOException e) { throw new JedisConnectionException(e); } } 

这里代码比较清晰,利用了Protocol提供的一些请求头来构造一个请求。这里具体的协议内容就不细解析了,发送完请求以后返回。

以后调用client.getStatusCodeReply();进行返回状态的获取:

public String getStatusCodeReply() { flush(); final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if (null == resp) { return null; } else { return SafeEncoder.encode(resp); } } 

首先调用了flush方法,保证以前的写入能发送出去,以后调用了readProtocolWithCheckingBroken来获取响应。

protected Object readProtocolWithCheckingBroken() { try { return Protocol.read(inputStream); } catch (JedisConnectionException exc) { broken = true; throw exc; } } 

调用Protocol.read进行对RedisInputStream进行读取,在这过程当中可能会抛出链接异常。

public static Object read(final RedisInputStream is) { return process(is);} private static Object process(final RedisInputStream is) { final byte b = is.readByte(); if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else if (b == DOLLAR_BYTE) { return processBulkReply(is); } else if (b == ASTERISK_BYTE) { return processMultiBulkReply(is); } else if (b == COLON_BYTE) { return processInteger(is); } else if (b == MINUS_BYTE) { processError(is); return null; } else { throw new JedisConnectionException("Unknown reply: " + (char) b); } } 

最后在read的时候对返回的响应进行了判断,枚举出了几种响应方式,对不一样的响应进行不一样的处理。
这里能够看出,整个交互过程就是一个Socket通讯过程。按照必定的协议发送请求,以后读取返回结果。可是这里也有一个问题就是线程安全问题,显然Jedis实例是线程不安全的,对于多线程共享jedis实例是会有问题的。同时直接使用jedis不能避免的须要反复的建立和销毁Socket,开销很大。因此就引出了后面的jedisPool的使用。

4、JedisPool的使用和实现

JedisPool是Jedis提供的一种对Redis的链接池,利用链接池能够很好的对Jedis的链接作一个很好的掌控,能避免建立和销毁的开销,同时能够进行按期的保活,能避免反复的建立链接。
下面是一个JedisPool例子:

JedisPoolConfig config = new JedisPoolConfig(); config.setTestOnBorrow(true); JedisPool pool = new JedisPool(config, hnp.getHost(), hnp.getPort(), 2000, "foobared"); Jedis jedis = pool.getResource(); jedis.set("foo", "bar"); jedis.close(); 

能够看到新建立了一个JedisPoolConfig,用于对JedisPool的配置。这里没有使用以前JedisPool的returnResource。由于jedis.close()已经作了相关的returnResource方法。
咱们先看一下JedisPoolConfig是什么:

public class JedisPoolConfig extends GenericObjectPoolConfig { public JedisPoolConfig() { // defaults to make your life with connection pool easier :) setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } } 

JedisPoolConfig继承了GenericObjectPoolConfig,GenericObjectPoolConfig是ApacheCommons pool提供的一个对象池的配置。JedisPool使用了ApacheCommons pool来进行链接池的实现。GenericObjectPoolConfig提供了不少的参数,咱们可使用JedisPoolConfig也可使用GenericObjectPoolConfig。下面列出一些关键的参数:

maxActive:控制一个pool可分配多少个jedis实例,经过pool.getResource()来获取;若是赋值为-1,则表示不限制;若是pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted。
maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
whenExhaustedAction:表示当pool中的jedis实例都被allocated完时,pool要采起的操做;默认有三种。
WHEN_EXHAUSTED_FAIL --> 表示无jedis实例时,直接抛出NoSuchElementException;
WHEN_EXHAUSTED_BLOCK --> 则表示阻塞住,或者达到maxWait时抛出JedisConnectionException;
WHEN_EXHAUSTED_GROW --> 则表示新建一个jedis实例,也就说设置的maxActive无用;
maxWait:表示当borrow一个jedis实例时,最大的等待时间,若是超过等待时间,则直接抛出JedisConnectionException;
testOnBorrow:在borrow一个jedis实例时,是否提早进行alidate操做;若是为true,则获得的jedis实例均是可用的;
testOnReturn:在return给pool时,是否提早进行validate操做;
testWhileIdle:若是为true,表示有一个idle object evitor线程对idle object进行扫描,若是validate失败,此object会被从pool中drop掉;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;
timeBetweenEvictionRunsMillis:表示idle object evitor两次扫描之间要sleep的毫秒数;
numTestsPerEvictionRun:表示idle object evitor每次扫描的最多的对象数;
minEvictableIdleTimeMillis:表示一个对象至少停留在idle状态的最短期,而后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;
softMinEvictableIdleTimeMillis:在minEvictableIdleTimeMillis基础上,加入了至少minIdle个对象已经在pool里面了。若是为-1,evicted不会根据idle time驱逐任何对象。若是minEvictableIdleTimeMillis>0,则此项设置无心义,且只有在timeBetweenEvictionRunsMillis大于0时才有意义;

配置比较多,这里我不打算详细的写Commons Pool的实现机制,只是说说JedisPool是怎么实现的。

JedisPool的实例化过程以下:

public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final int database, final String clientName) { this(poolConfig, host, port, timeout, timeout, password, database, clientName, false, null, null, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { super(poolConfig, new JedisFactory(host, port, connectionTimeout, soTimeout, password, database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); } 

这里实例化了一个JedisFactory,这个工厂类十分关键,这个工厂类是Commons pool来进行多对象池对象进行管理的一个工厂,对于全部对象的建立、销毁、激活和有效性校验都是在JedisFactory中进行的:

class JedisFactory implements PooledObjectFactory<Jedis> { private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; public JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { this.hostAndPort.set(new HostAndPort(host, port)); this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; this.database = database; this.clientName = clientName; this.ssl = ssl; this.sslSocketFactory = sslSocketFactory; this.sslParameters = sslParameters; this.hostnameVerifier = hostnameVerifier; } 

JedisFactory实现了PooledObjectFactory接口,PooledObjectFactory是Commons Pool提供的接口。PooledObjectFactory提供了不少的方法:

public interface PooledObjectFactory<T> { PooledObject<T> makeObject() throws Exception; void destroyObject(PooledObject<T> var1) throws Exception; boolean validateObject(PooledObject<T> var1); void activateObject(PooledObject<T> var1) throws Exception; void passivateObject(PooledObject<T> var1) throws Exception; } 

makeObject为建立对象的方法。
destroyObject为销毁对象的方法。
validateObject为校验对象有消息的方法。
activateObject为激活对象的方法。
passivateObject为钝化对象的方法。

对于对象池对对象的管理使用了PooledObjectFactory中的方法,也算作到了“解耦”,本身的东西本身管,Commons Pool 不侵入任何逻辑。

在建立好了JedisPool以后呢,在使用的时候利用getResource来获取jedis的客户端:

public Jedis getResource() { Jedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } public T getResource() { try { return internalPool.borrowObject(); } catch (NoSuchElementException nse) { throw new JedisException("Could not get a resource from the pool", nse); } catch (Exception e) { throw new JedisConnectionException("Could not get a resource from the pool", e); } } 

internalPool是一个Commons pool。咱们在获取jedis的时候调用了Commons pool的borrowObject。表面的意思就是借一个连接。同时将JedisPool的引用交给jedis,便于在close的时候进行连接的返还:

@Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } } 

在jedis调用close方法时候,调用dataSource.returnResource进行连接的返还。

这样jedis和JedisPool的实现就分析完了,可是对于Commons Pool对咱们仍是黑盒的,接下来会写一个对Commons pool的实现原理的笔记。同时呢对于jedis Pool只能进行单实例的连接操做,可是对于数据量大的时候,单实例不能知足需求。这个时候就须要对实例进行“分片”。Jedis也是提供了分片的支持,后面也会总结一个jedis分片的实现。

相关文章
相关标签/搜索