当业务的数据量和访问量急剧增长的状况下,咱们须要对数据进行水平拆分,从而下降单库的压力,而且数据的水平拆分须要对业务透明,屏蔽掉水平拆分的细节。而且,前端业务的高并发会致使后端的数据库链接过多,从而DB的性能低下。前端
Cobar就是解决这些问题的一款分库分表中间件,Cobar以proxy的形式位于前端应用和后端数据库之间,Cobar对前端暴露的接口是MySQL通讯协议,其将前端传输过来的SQL语句按照sharding规则路由到后端的数据库实例上,再合并多个实例返回的结果,从而模拟单库下的数据库行为。react
Cobar的使用方法就很少介绍了,本文的主要内容是剖析Cobar的源代码。sql
结构图以下:数据库
咱们先来看CobarServer的代码:编程
private CobarServer() { this.config = new CobarConfig(); SystemConfig system = config.getSystem(); MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion()); this.timer = new Timer(NAME + "Timer", true); this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor()); this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor()); this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor()); this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount()); this.isOnline = new AtomicBoolean(true); this.startupTime = TimeUtil.currentTimeMillis(); }
上面是CobarServer的构造函数,它的限定是private的。segmentfault
private static final CobarServer INSTANCE = new CobarServer(); public static final CobarServer getInstance() { return INSTANCE; }
而CobarServer又有一个私有的静态变量INSTANCE
,以及获取这个私有静态变量的静态方法,显然,这是一个单例设计模式,使程序运行的时候全局只有一个CobarServer对象。后端
咱们再来看CobarServer的startup()方法,此方法中构造了一个NIOAcceptor(绑定服务器端口,接受客户端的链接),设计模式
server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);
构造了一个接收前端链接的非阻塞Acceptor,让咱们在来看NIOAcceptor类的代码。数组
public final class NIOAcceptor extends Thread { private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class); private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator(); private final int port; private final Selector selector; private final ServerSocketChannel serverChannel; private final FrontendConnectionFactory factory; private NIOProcessor[] processors; private int nextProcessor; private long acceptCount; public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); # 生成选择器 this.serverChannel = ServerSocketChannel.open(); this.serverChannel.socket().bind(new InetSocketAddress(port)); # 绑定服务器端口 this.serverChannel.configureBlocking(false); # 设置非阻塞模式 this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); # 监听ACCEPT事件, this.factory = factory; # 设置前端链接的工厂 } }
以上的代码都是NIO编程中很常见的操做。下面咱们看run()方法,服务器
@Override public void run() { final Selector selector = this.selector; for (;;) { ++acceptCount; try { selector.select(1000L); # select操做是阻塞的,若没有监听到相应的事件,则一直阻塞,直到超过1000毫秒,则返回 Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { accept(); # 接受链接,这个方法很关键 } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(getName(), e); } } }
以上的run方法也是常见的NIO中监听事件的套路,其中accept()方法是定义的私有函数,accept方法是为了将channel与selector绑定,代码以下,
private void accept() { SocketChannel channel = null; try { channel = serverChannel.accept(); # 为新的链接分配socket channel.configureBlocking(false); # 设置为非阻塞模式 # factory将channel进行封装,进行相应的设置,返回一个FrontendConnection,connection本质上就是一个封装好的channel FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); # 为链接设置ID NIOProcessor processor = nextProcessor(); # 为链接分配processor,NIOAcceptor中包含了一个NIOProcessor数组,分配的策略即根据下标不断后移,到达数组末尾后又从数组的起始位置开始分配 c.setProcessor(processor); # 回调NIOProcessor的postRegister方法,而processor的postRegister调用的是NIOReactor类的postRegister方法 processor.postRegister(c); } catch (Throwable e) { closeChannel(channel); LOGGER.warn(getName(), e); } }
让我来看NIOProcessor的postRegister方法,
public void postRegister(NIOConnection c) { reactor.postRegister(c); }
NIOProcessor类中定义了一个NIOReactor类的成员变量reactor,而postRegister调用的是NIOReactor的postRegister方法。下面让咱们来看NIOReactor的postRegister代码,
final void postRegister(NIOConnection c) { # 只是先将前端链接插入R线程的阻塞队列中,并无马上将channel与selector进行绑定 reactorR.registerQueue.offer(c); # 唤醒R线程的selector,若以前的select操做没有返回的话则当即返回 reactorR.selector.wakeup(); }
既然channel与selector没有马上进行绑定,那它们是何时绑定的呢?咱们来看NIOReactor中内部类R的run()方法,
@Override public void run() { final Selector selector = this.selector; for (;;) { ++reactCount; try { selector.select(1000L); # 将connection与selector进行绑定 register(selector); Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { Object att = key.attachment(); if (att != null && key.isValid()) { int readyOps = key.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0) { read((NIOConnection) att); } else if ((readyOps & SelectionKey.OP_WRITE) != 0) { write((NIOConnection) att); } else { key.cancel(); } } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(name, e); } } }
在run方法中,当select方法返回的时候,就会进行channel和selector的绑定,由于当connection插入到阻塞队列中的时候,会对selector进行wakeup(),即select(1000L)方法会当即返回,因此没必要担忧channel会卡一秒钟才会和selector进行绑定。
咱们再来看R线程的register方法,
private void register(Selector selector) { NIOConnection c = null; # 将R线程阻塞队列中的全部链接都轮询取出,与selector进行绑定 while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Throwable e) { c.error(ErrorCode.ERR_REGISTER, e); } } }
关于NIOAcceptor为什么先将connection放入Reactor的阻塞队列,而不是直接绑定。笔者的观点是,若是由NIOAcceptor负责绑定则会形成锁竞争,selector的register方法会争用锁,会致使NIOAcceptor线程和R、W线程竞争selector的锁,若acceptor中处理绑定connection的逻辑,则NIOAcceptor就不能快速地处理大量的链接,整个系统的吞吐就会下降。因此Cobar中的设计是将connection的绑定放到R线程的阻塞队列中去,让R线程来完成connection的绑定工做。
图就随意看看吧-.-,有点丑。
以上。