Apache Thrift系列详解(二) - 网络服务模型

前言

Thrift提供的网络服务模型单线程多线程事件驱动,从另外一个角度划分为:阻塞服务模型非阻塞服务模型java

  • 阻塞服务模型:TSimpleServerTThreadPoolServer算法

  • 非阻塞服务模型:TNonblockingServerTHsHaServerTThreadedSelectorServer数据库

TServer类的层次关系:apache


正文

TServer

TServer定义了静态内部类ArgsArgs继承自抽象类AbstractServerArgsAbstractServerArgs采用了建造者模式,向TServer提供各类工厂:编程

工厂属性 工厂类型 做用
ProcessorFactory TProcessorFactory 处理层工厂类,用于具体的TProcessor对象的建立
InputTransportFactory TTransportFactory 传输层输入工厂类,用于具体的TTransport对象的建立
OutputTransportFactory TTransportFactory 传输层输出工厂类,用于具体的TTransport对象的建立
InputProtocolFactory TProtocolFactory 协议层输入工厂类,用于具体的TProtocol对象的建立
OutputProtocolFactory TProtocolFactory 协议层输出工厂类,用于具体的TProtocol对象的建立

下面是TServer的部分核心代码:后端

public abstract class TServer {
    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }

    public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) {
            serverTransport = transport;
        }
    }

    protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;

    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
        processorFactory_ = args.processorFactory;
        serverTransport_ = args.serverTransport;
        inputTransportFactory_ = args.inputTransportFactory;
        outputTransportFactory_ = args.outputTransportFactory;
        inputProtocolFactory_ = args.inputProtocolFactory;
        outputProtocolFactory_ = args.outputProtocolFactory;
    }

    public abstract void serve();
    public void stop() {}

    public boolean isServing() {
        return isServing;
    }

    protected void setServing(boolean serving) {
        isServing = serving;
    }
}
复制代码

TServer的三个方法:serve()stop()isServing()serve()用于启动服务,stop()用于关闭服务,isServing()用于检测服务的起停状态。缓存

TServer不一样实现类的启动方式不同,所以serve()定义为抽象方法。不是全部的服务都须要优雅的退出, 所以stop()方法没有被定义为抽象。服务器


TSimpleServer

TSimpleServer工做模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,可是一次只能接收和处理一个socket链接,效率比较低。它主要用于演示Thrift的工做过程,在实际开发过程当中不多用到它。网络

(一) 工做流程

(二) 使用入门

服务端:多线程

ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor processor =
            new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
    tArgs.processor(processor);
    tArgs.protocolFactory(protocolFactory);
    // 简单的单线程服务模型 通常用于测试
    TServer tServer = new TSimpleServer(tArgs);
    System.out.println("Running Simple Server");
    tServer.serve();
复制代码

客户端:

TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("Leo");
    System.out.println("Result =: " + result);
    transport.close();
复制代码

(三) 源码分析

查看上述流程的源代码,即TSimpleServer.java中的serve()方法以下:

serve()方法的操做:

  1. 设置TServerSocketlisten()方法启动链接监听
  2. 阻塞的方式接受客户端地链接请求,每进入一个链接即为其建立一个通道TTransport对象。
  3. 为客户端建立处理器对象输入传输通道对象输出传输通道对象输入协议对象输出协议对象
  4. 经过TServerEventHandler对象处理具体的业务请求。

ThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工做,主线程负责阻塞式监听是否有新socket到来,具体的业务处理交由一个线程池来处理。

(一) 工做流程

(二) 使用入门

服务端:

ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor<HelloWorldService.Iface> processor =
            new HelloWorldService.Processor<>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    ttpsArgs.processor(processor);
    ttpsArgs.protocolFactory(protocolFactory);

    // 线程池服务模型 使用标准的阻塞式IO 预先建立一组线程处理请求
    TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
    System.out.println("Running ThreadPool Server");
    ttpsServer.serve();
复制代码

客户端:

TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();
    String result = client.say("ThreadPoolClient");
    System.out.println("Result =: " + result);
    transport.close();
复制代码

(三) 源码分析

ThreadPoolServer解决了TSimpleServer不支持并发多链接的问题,引入了线程池。实现的模型是One Thread Per Connection。查看上述流程的源代码,先查看线程池的代码片断:

TThreadPoolServer.java中的serve()方法以下:

serve()方法的操做:

  1. 设置TServerSocketlisten()方法启动链接监听
  2. 阻塞的方式接受客户端链接请求,每进入一个链接,将通道对象封装成一个WorkerProcess对象(WorkerProcess实现了Runnabel接口),并提交到线程池
  3. WorkerProcessrun()方法负责业务处理,为客户端建立了处理器对象输入传输通道对象输出传输通道对象输入协议对象输出协议对象
  4. 经过TServerEventHandler对象处理具体的业务请求。

WorkerProcessrun()方法:

(四) 优缺点

TThreadPoolServer模式的优势

拆分了监听线程(Accept Thread)和处理客户端链接工做线程(Worker Thread),数据读取业务处理都交给线程池处理。所以在并发量较大时新链接也可以被及时接受。

线程池模式比较适合服务器端能预知最多有多少个客户端并发的状况,这时每一个请求都能被业务线程池及时处理,性能也很是高。

TThreadPoolServer模式的缺点

线程池模式的处理能力受限于线程池的工做能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待


TNonblockingServer

TNonblockingServer模式也是单线程工做,可是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。

全部的socket都被注册到selector中,在一个线程中经过seletor循环监控全部的socket

每次selector循环结束时,处理全部的处于就绪状态socket,对于有数据到来的socket进行数据读取操做,对于有数据发送的socket则进行数据发送操做,对于监听socket则产生一个新业务socket并将其注册selector上。

注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。

(一) 工做流程

(二) 使用入门

服务端:

TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory());

    // 使用非阻塞式IO服务端和客户端须要指定TFramedTransport数据传输的方式
    TServer server = new TNonblockingServer(tnbArgs);
    System.out.println("Running Non-blocking Server");
    server.serve();
复制代码

客户端:

TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 协议要和服务端一致
    TProtocol protocol = new TCompactProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("NonBlockingClient");
    System.out.println("Result =: " + result);
    transport.close();
复制代码

(三) 源码分析

TNonblockingServer继承于AbstractNonblockingServer,这里咱们更关心基于NIOselector部分的关键代码。

(四) 优缺点

TNonblockingServer模式优势

相比于TSimpleServer效率提高主要体如今IO多路复用上TNonblockingServer采用非阻塞IO,对accept/read/writeIO事件进行监控处理,同时监控多个socket的状态变化。

TNonblockingServer模式缺点

TNonblockingServer模式在业务处理上仍是采用单线程顺序来完成。在业务处理比较复杂耗时的时候,例如某些接口函数须要读取数据库执行时间较长,会致使整个服务阻塞住,此时该模式效率也不高,由于多个调用请求任务依然是顺序一个接一个执行。

THsHaServer

鉴于TNonblockingServer的缺点,THsHaServer继承于TNonblockingServer,引入了线程池提升了任务处理的并发能力THsHaServer半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handlerrpc同步处理上。

注意:THsHaServer和TNonblockingServer同样,要求底层的传输通道必须使用TFramedTransport。

(一) 工做流程

(二) 使用入门

服务端:

TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 半同步半异步
    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    thhsArgs.processor(tprocessor);
    thhsArgs.transportFactory(new TFramedTransport.Factory());
    thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

    TServer server = new THsHaServer(thhsArgs);
    System.out.println("Running HsHa Server");
    server.serve();
复制代码

客户端:

TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 协议要和服务端一致
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("HsHaClient");
    System.out.println("Result =: " + result);
    transport.close();
复制代码

(三) 源码分析

THsHaServer继承于TNonblockingServer,新增了线程池并发处理工做任务的功能,查看线程池的相关代码:

任务线程池的建立过程:

下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。

(四) 优缺点

THsHaServer的优势

THsHaServerTNonblockingServer模式相比,THsHaServer在完成数据读取以后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操做,效率大大提高。

THsHaServer的缺点

主线程仍然须要完成全部socket监听接收数据读取数据写入操做。当并发请求数较大时,且发送数据量较多时,监听socket新链接请求不能被及时接受。


TThreadedSelectorServer

TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中的读写IO事件(read/write)从主线程中分离出来。同时引入worker工做线程池,它也是种Half-Sync/Half-Async的服务模型。

TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有若是几个部分构成:

  1. 一个AcceptThread线程对象,专门用于处理监听socket上的新链接。
  2. 若干个SelectorThread对象专门用于处理业务socket网络I/O读写操做,全部网络数据的读写均是有这些线程来完成。
  3. 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket链接请求时,决定将这个新链接请求分配给哪一个SelectorThread线程
  4. 一个ExecutorService类型的工做线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取以后,交给ExecutorService线程池中的线程完成这次调用的具体执行。主要用于处理每一个rpc请求的handler回调处理(这部分是同步的)。

(一) 工做流程

(二) 使用入门

服务端:

TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 多线程半同步半异步
    TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
    ttssArgs.processor(processor);
    ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
    // 使用非阻塞式IO时 服务端和客户端都须要指定数据传输方式为TFramedTransport
    ttssArgs.transportFactory(new TFramedTransport.Factory());

    // 多线程半同步半异步的服务模型
    TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
    System.out.println("Running ThreadedSelector Server");
    server.serve();
复制代码

客户端:

for (int i = 0; i < 10; i++) {
    new Thread("Thread " + i) {
        @Override
        public void run() {
            // 设置传输通道 对于非阻塞服务 须要使用TFramedTransport(用于将数据分块发送)
            for (int j = 0; j < 10; j++) {
                TTransport transport = null;
                try {
                    transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
                    TProtocol protocol = new TBinaryProtocol(transport);
                    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
                    transport.open();
                    String result = client.say("ThreadedSelector Client");
                    System.out.println("Result =: " + result);
                    transport.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 关闭传输通道
                    transport.close();
                }
            }
        }
    }.start();
}
复制代码

(三) 核心代码

以上工做流程的三个组件AcceptThreadSelectorThreadExecutorService在源码中的定义以下:

TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新链接请求,所以可以及时响应大量并发链接请求;另外它将网络I/O操做分散到多个SelectorThread线程中来完成,所以可以快速对网络I/O进行读写操做,可以很好地应对网络I/O较多的状况。

TThreadedSelectorServer默认参数定义以下:

  • 负责网络IO读写的selector默认线程数(selectorThreads):2
  • 负责业务处理的默认工做线程数(workerThreads):5
  • 工做线程池单个线程的任务队列大小(acceptQueueSizePerThread):4

建立、初始化并启动AcceptThreadSelectorThreads,同时启动selector线程的负载均衡器(selectorThreads)。

AcceptThread源码

AcceptThread继承于Thread,能够看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport)、NIO选择器(acceptSelector)和选择器线程负载均衡器(threadChooser)。

查看AcceptThreadrun()方法,能够看出accept线程一旦启动,就会不停地调用select()方法:

查看select()方法,acceptSelector选择器等待IO事件的到来,拿到SelectionKey即检查是否是accept事件。若是是,经过handleAccept()方法接收一个新来的链接;不然,若是是IO读写事件AcceptThread不做任何处理,交由SelectorThread完成。

handleAccept()方法中,先经过doAccept()去拿链接通道,而后Selector线程负载均衡器选择一个Selector线程,完成接下来的IO读写事件

接下来继续查看doAddAccept()方法的实现,毫无悬念,它进一步调用了SelectorThreadaddAcceptedConnection()方法,把非阻塞传输通道对象传递给选择器线程作进一步的IO读写操做

SelectorThreadLoadBalancer源码

SelectorThreadLoadBalancer如何建立?

SelectorThreadLoadBalancer是一个基于轮询算法Selector线程选择器,经过线程迭代器为新进来的链接顺序分配SelectorThread

SelectorThread源码

SelectorThreadAcceptThread同样,是TThreadedSelectorServer的一个成员内部类,每一个SelectorThread线程对象内部都有一个阻塞式的队列,用于存放该线程被接收链接通道

阻塞队列的大小可由构造函数指定:

上面看到,在AcceptThreaddoAddAccept()方法中调用了SelectorThreadaddAcceptedConnection()方法。

这个方法作了两件事:

  1. 将被此SelectorThread线程接收的链接通道放入阻塞队列中。
  2. 经过wakeup()方法唤醒SelectorThread中的NIO选择器selector

既然SelectorThread也是继承于Thread,查看其run()方法的实现:

SelectorThread方法的select()监听IO事件,仅仅用于处理数据读取数据写入。若是链接有数据可读,读取并以frame的方式缓存;若是须要向链接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,须要向NIOselector清空注销自身的SelectionKey

  • 数据写操做完成之后,整个rpc调用过程也就结束了,handleWrite()方法以下:

  • 数据读操做完成之后,Thrift会利用已读数据执行目标方法handleRead()方法以下:

handleRead方法在执行read()方法,将数据读取完成后,会调用requestInvoke()方法调用目标方法完成具体业务处理。requestInvoke()方法将请求数据封装为一个Runnable对象,提交到工做任务线程池(ExecutorService)进行处理。

select()方法完成后,线程继续运行processAcceptedConnections()方法处理下一个链接IO事件。

这里比较核心的几个操做:

  1. 尝试从SelectorThread阻塞队列acceptedQueue中获取一个链接的传输通道。若是获取成功,调用registerAccepted()方法;不然,进入下一次循环。
  2. registerAccepted()方法将传输通道底层的链接注册到NIO选择器selector上面,获取到一个SelectionKey
  3. 建立一个FrameBuffer对象,并绑定到获取的SelectionKey上面,用于数据传输时的中间读写缓存

总结

本文对Thrift的各类线程服务模型进行了介绍,包括2种阻塞式服务模型TSimpleServerTThreadPoolServer,3种非阻塞式服务模型TNonblockingServerTHsHaServerTThreadedSelectorServer。对各类服务模型的具体用法工做流程原理和源码实现进行了必定程度的分析。

鉴于篇幅较长,请各位看官请慢慢批阅!

相关连接

  1. Apache Thrift系列详解(一) - 概述与入门

  2. Apache Thrift系列详解(二) - 网络服务模型

  3. Apache Thrift系列详解(三) - 序列化机制


欢迎关注技术公众号: 零壹技术栈

零壹技术栈

本账号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。

相关文章
相关标签/搜索