限于篇幅关系,在观察源码的时候,只列举了部分源代码html
TSimpleServer/TThreadPoolServer是阻塞服务模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO)git
内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)github
public abstract class TServer { public static class Args extends AbstractServerArgs<Args> { public Args(TServerTransport transport) { super(transport); } } public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> { public AbstractServerArgs(TServerTransport transport); public T processorFactory(TProcessorFactory factory); public T processor(TProcessor processor); public T transportFactory(TTransportFactory factory); public T protocolFactory(TProtocolFactory factory); } }
TServer类定义的抽象类数据库
public abstract class TServer { public abstract void serve(); public void stop(); public boolean isServing(); public void setServerEventHandler(TServerEventHandler eventHandler); }
评注:服务器
抽象函数serve由具体的TServer实例来实现, 而并不是全部的服务都须要优雅的退出, 所以stop没有被定义为抽象网络
TSimpleServer的工做模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,可是一次只能接收和处理一个socket链接,效率比较低,主要用于演示Thrift的工做过程,在实际开发过程当中不多用到它。并发
工做方式如图:负载均衡
抽象的代码可简单描述以下:socket
// *) server socket进行监听 serverSocket.listen(); while ( isServing() ) { // *) 接受socket连接 client = serverSocket.accept(); // *) 封装处理器 processor = factory.getProcess(client); while ( true ) { // *) 阻塞处理rpc的输入/输出 if ( !processor.process(input, output) ) { break; } } }
TThreadPoolServer模式采用阻塞socket方式工做,,主线程负责阻塞式监听“监听socket”中是否有新socket到来,业务处理交由一个线程池来处async
工做模式图:
ThreadPoolServer解决了TSimple不支持并发和多链接的问题, 引入了线程池. 实现的模型是One Thread Per Connection
线程池代码片断:
private static ExecutorService createDefaultExecutorService(Args args) { SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>(); return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal, TimeUnit.SECONDS, executorQueue); }
评注:
采用同步队列(SynchronousQueue), 线程池采用能线程数可伸缩的模式.
主线程循环简单描述代码:
setServing(true); while (!stopped_) { try { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); executorService_.execute(wp); } catch (TTransportException ttx) { } }
TThreadPoolServer模式优势:
线程池模式中,拆分了监听线程(accept)和处理客户端链接的工做线程(worker),数据读取和业务处理都交由线程池完成,主线程只负责监听新链接,所以在并发量较大时新链接也可以被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的状况,这时每一个请求都能被业务线程池及时处理,性能也很是高。
TThreadPoolServer模式缺点:
线程池模式的处理能力受限于线程池的工做能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待
TNonblockingServer该模式也是单线程工做,可是采用NIO的模式, 借助Channel/Selector机制, 采用IO事件模型来处理.
全部的socket都被注册到selector中,在一个线程中经过seletor循环监控全部的socket,每次selector结束时,处理全部的处于就绪状态的socket,对于有数据到来的socket进行数据读取操做,对于有数据发送的socket则进行数据发送,对于监听socket则产生一个新业务socket并将其注册到selector中。
工做原理图:
nio部分关键代码以下:
private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } }
TNonblockingServer模式优势:
相比于TSimpleServer效率提高主要体如今IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/write等IO事件进行监控和处理,同时监控多个socket的状态变化;
TNonblockingServer模式缺点:
TNonblockingServer模式在业务处理上仍是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数须要读取数据库执行时间较长,会致使整个服务被阻塞住,此时该模式效率也不高,由于多个调用请求任务依然是顺序一个接一个执行
鉴于TNonblockingServer的缺点, THsHaServer继承TNonblockingServer,引入了线程池去处理, 其模型把读写任务放到线程池去处理.THsHaServer是Half-sync/Half-async的处理模式, Half-aysnc是在处理IO事件上(accept/read/write io), Half-sync用于handler对rpc的同步处理上.
工做模式图:
/** * Helper to create an invoker pool */ protected static ExecutorService createInvokerPool(Args options) { int minWorkerThreads = options.minWorkerThreads; int maxWorkerThreads = options.maxWorkerThreads; int stopTimeoutVal = options.stopTimeoutVal; TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue); return invoker; }
THsHaServer的优势:
与TNonblockingServer模式相比,THsHaServer在完成数据读取以后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操做,效率大大提高;
THsHaServer的缺点:
主线程须要完成对全部socket的监听以及数据读写的工做,当并发请求数较大时,且发送数据量较多时,监听socket上新链接请求不能被及时接受。
TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程, 同时引入Worker工做线程池. 它也是种Half-sync/Half-async的服务模型
TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,它内部有若是几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新链接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操做,全部网络数据的读写均是有这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket链接请求时,决定将这个新链接请求分配给哪一个SelectorThread线程。
(4) 一个ExecutorService类型的工做线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取以后,交个ExecutorService线程池中的线程完成这次调用的具体执行;主要用于处理每一个rpc请求的handler回调处理(这部分是同步的).
工做模式图:
TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新链接请求,所以可以及时响应大量并发链接请求;另外它将网络I/O操做分散到多个SelectorThread线程中来完成,所以可以快速对网络I/O进行读写操做,可以很好地应对网络I/O较多的状况
从accpect线程到selectorThreads关键代码
protected boolean startThreads() { try { for (int i = 0; i < args.selectorThreads; ++i) { selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));//创建事件选择线程池 } acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads));//创建accept接受请求线程 for (SelectorThread thread : selectorThreads) { thread.start(); } acceptThread.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start threads!", e); return false; } }
负载均衡器SelectorThreadLoadBalancer对象部分关键代码:
protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) { return new SelectorThreadLoadBalancer(threads); } /** * A round robin load balancer for choosing selector threads for new * connections. */ protected static class SelectorThreadLoadBalancer { private final Collection<? extends SelectorThread> threads; private Iterator<? extends SelectorThread> nextThreadIterator; public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) { if (threads.isEmpty()) { throw new IllegalArgumentException("At least one selector thread is required"); } this.threads = Collections.unmodifiableList(new ArrayList<T>(threads)); nextThreadIterator = this.threads.iterator(); } //根据循环负载均衡策略获取一个SelectorThread public SelectorThread nextThread() { // Choose a selector thread (round robin) if (!nextThreadIterator.hasNext()) { nextThreadIterator = threads.iterator(); } return nextThreadIterator.next(); } }
从SelectorThread线程中,监听到有业务socket中有调用请求,转到业务工做线程池关键代码
private void handleAccept() { final TNonblockingTransport client = doAccept();//取得客户端的链接 if (client != null) { // Pass this connection to a selector thread final SelectorThread targetThread = threadChooser.nextThread();//获取目标SelectorThread if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { doAddAccept(targetThread, client); } else { // FAIR_ACCEPT try { invoker.submit(new Runnable() {// 提交client的业务给到工做线程 public void run() { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!", rx); // close immediately client.close(); } } } }
demo地址:
码云:http://git.oschina.net/shunyang/thrift-all/tree/master/thrift-demo
github:https://github.com/shunyang/thrift-all/tree/master/thrift-demo
本文参考文章: