Thrift RPC实战(二) Thrift 网络服务模型

TServer类层次体系

TSimpleServer/TThreadPoolServer是阻塞服务模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO)数据库

1 TServer抽象类的定义

内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)服务器

public abstract class TServer {
    protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;
    protected TServerEventHandler eventHandler_;
    protected volatile boolean stopped_ = false;

    protected TServer(TServer.AbstractServerArgs args) {
        this.processorFactory_ = args.processorFactory;
        this.serverTransport_ = args.serverTransport;
        this.inputTransportFactory_ = args.inputTransportFactory;
        this.outputTransportFactory_ = args.outputTransportFactory;
        this.inputProtocolFactory_ = args.inputProtocolFactory;
        this.outputProtocolFactory_ = args.outputProtocolFactory;
    }

    // 启动服务 由具体的子类实现
    public abstract void serve();

    // 中止服务
    public void stop() {
    }

    public abstract static class AbstractServerArgs<T extends TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new Factory();
        TProtocolFactory outputProtocolFactory = new Factory();

        public AbstractServerArgs(TServerTransport transport) {
            this.serverTransport = transport;
        }
    }
    
    //内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)
    public static class Args extends TServer.AbstractServerArgs<TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }
}

2 TSimpleServer

TSimpleServer的工做模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,可是一次只能接收和处理一个socket链接,效率比较低,主要用于演示Thrift的工做过程,在实际开发过程当中不多用到它。网络

serverTransport_.listen();//启动监听  即 new ServerSocket(port)
while (!stopped_) {//stopped 初始位false
    TTransport client = null;
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;
    ServerContext connectionContext = null;
    try {
      client = serverTransport_.accept();//接收客户端 即 ServerSocket.accept();
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        if (eventHandler_ != null) {
          connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
        }
        while (true) {
          if (eventHandler_ != null) {
            eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
          }
          if(!processor.process(inputProtocol, outputProtocol)) {//处理输入输出
            break;
          }
        }
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    if (eventHandler_ != null) {
      eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
    }

    if (inputTransport != null) {
      inputTransport.close();
    }

    if (outputTransport != null) {
      outputTransport.close();
    }

  }
  setServing(false);
}

3 ThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工做,,主线程负责阻塞式监听“监听socket”中是否有新socket到来,业务处理交由一个线程池来处并发

工做模式图:负载均衡

主线程循环简单描述代码:socket

// 建立线程池执行器
private static ExecutorService createDefaultExecutorService(Args args) {
  SynchronousQueue<Runnable> executorQueue =
    new SynchronousQueue<Runnable>();
  return new ThreadPoolExecutor(args.minWorkerThreads,
                                args.maxWorkerThreads,
                                args.stopTimeoutVal,
                                args.stopTimeoutUnit,
                                executorQueue);
}
stopped_ = false;
serverTransport_.listen();
while (!stopped_) {
  try {
    TTransport client = serverTransport_.accept();
    WorkerProcess wp = new WorkerProcess(client);
    while(true) {
      try {
        executorService_.execute(wp);//ThreadPoolExecutor.execute(wp)
        break;
      } catch(Throwable t) { 
    }
  } 
}

executorService_.shutdown();

TThreadPoolServer模式优势:
线程池模式中,拆分了监听线程(accept)和处理客户端链接的工做线程(worker),数据读取和业务处理都交由线程池完成,主线程只负责监听新链接,所以在并发量较大时新链接也可以被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的状况,这时每一个请求都能被业务线程池及时处理,性能也很是高。
TThreadPoolServer模式缺点:
线程池模式的处理能力受限于线程池的工做能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。async

4 TNonblockingServer

TNonblockingServer该模式也是单线程工做,可是采用NIO的模式, 借助Channel/Selector机制, 采用IO事件模型来处理.ide

全部的socket都被注册到selector中,在一个线程中经过seletor循环监控全部的socket,每次selector结束时,处理全部的处于就绪状态的socket,对于有数据到来的socket进行数据读取操做,对于有数据发送的socket则进行数据发送,对于监听socket则产生一个新业务socket并将其注册到selector中。函数

工做原理图:性能

public void run() {
  try {
  
    while (!stopped_) {
      select();
      processInterestChanges();
    }
   
  } catch (Throwable t) {
    
  } 
}
private void select() {
  try {
    // wait for io events.
    selector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

TNonblockingServer模式优势:
相比于TSimpleServer效率提高主要体如今IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/write等IO事件进行监控和处理,同时监控多个socket的状态变化;
TNonblockingServer模式缺点:
TNonblockingServer模式在业务处理上仍是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数须要读取数据库执行时间较长,会致使整个服务被阻塞住,此时该模式效率也不高,由于多个调用请求任务依然是顺序一个接一个执行。

5 THsHaServer

鉴于TNonblockingServer的缺点, THsHaServer继承TNonblockingServer,引入了线程池去处理, 其模型把读写任务放到线程池去处理.THsHaServer是Half-sync/Half-async的处理模式, Half-aysnc是在处理IO事件上(accept/read/write io), Half-sync用于handler对rpc的同步处理上.

/**
 * Helper to create an invoker pool
 */
protected static ExecutorService createInvokerPool(Args options) {
  int minWorkerThreads = options.minWorkerThreads;
  int maxWorkerThreads = options.maxWorkerThreads;
  int stopTimeoutVal = options.stopTimeoutVal;
  TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

  LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
    maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

  return invoker;
}

THsHaServer的优势:
与TNonblockingServer模式相比,THsHaServer在完成数据读取以后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操做,效率大大提高;
THsHaServer的缺点:
主线程须要完成对全部socket的监听以及数据读写的工做,当并发请求数较大时,且发送数据量较多时,监听socket上新链接请求不能被及时接受。

6. TThreadedSelectorServer

TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程, 同时引入Worker工做线程池. 它也是种Half-sync/Half-async的服务模型

TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,它内部有若是几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新链接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操做,全部网络数据的读写均是有这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket链接请求时,决定将这个新链接请求分配给哪一个SelectorThread线程。
(4) 一个ExecutorService类型的工做线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取以后,交个ExecutorService线程池中的线程完成这次调用的具体执行;主要用于处理每一个rpc请求的handler回调处理(这部分是同步的).


TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新链接请求,所以可以及时响应大量并发链接请求;另外它将网络I/O操做分散到多个SelectorThread线程中来完成,所以可以快速对网络I/O进行读写操做,可以很好地应对网络I/O较多的状况

从accpect线程到selectorThreads关键代码

@Override
protected boolean startThreads() {
  try {
    for (int i = 0; i < args.selectorThreads; ++i) {
      selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
    }
    acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
      createSelectorThreadLoadBalancer(selectorThreads));//建立负载均衡器
    for (SelectorThread thread : selectorThreads) {
      thread.start();
    }
    acceptThread.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start threads!", e);
    return false;
  }
}

从SelectorThread线程中,监听到有业务socket中有调用请求,转到业务工做线程池关键代码

private void select() {
  try {
    // wait for connect events.
    acceptSelector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        continue;
      }

      if (key.isAcceptable()) {
        handleAccept();
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}
private void handleAccept() {
  final TNonblockingTransport client = doAccept();
  if (client != null) {
    // Pass this connection to a selector thread
    final SelectorThread targetThread = threadChooser.nextThread();

    if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
      doAddAccept(targetThread, client);
    } else {
      // FAIR_ACCEPT
      try {
        invoker.submit(new Runnable() {
          public void run() {
            doAddAccept(targetThread, client);
          }
        });
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected accept registration!", rx);
        // close immediately
        client.close();
      }
    }
  }
}

图片来源网上

相关文章
相关标签/搜索