Thrift
提供的网络服务模型:单线程、多线程、事件驱动,从另外一个角度划分为:阻塞服务模型、非阻塞服务模型。java
阻塞服务模型:TSimpleServer
、TThreadPoolServer
。算法
非阻塞服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。数据库
TServer
类的层次关系:apache
TServer
定义了静态内部类Args
,Args
继承自抽象类AbstractServerArgs
。AbstractServerArgs
采用了建造者模式,向TServer
提供各类工厂:编程
工厂属性 | 工厂类型 | 做用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 处理层工厂类,用于具体的TProcessor对象的建立 |
InputTransportFactory | TTransportFactory | 传输层输入工厂类,用于具体的TTransport对象的建立 |
OutputTransportFactory | TTransportFactory | 传输层输出工厂类,用于具体的TTransport对象的建立 |
InputProtocolFactory | TProtocolFactory | 协议层输入工厂类,用于具体的TProtocol对象的建立 |
OutputProtocolFactory | TProtocolFactory | 协议层输出工厂类,用于具体的TProtocol对象的建立 |
下面是TServer
的部分核心代码:后端
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
复制代码
TServer
的三个方法:serve()
、stop()
和isServing()
。serve()
用于启动服务,stop()
用于关闭服务,isServing()
用于检测服务的起停状态。缓存
TServer
的不一样实现类的启动方式不同,所以serve()
定义为抽象方法。不是全部的服务都须要优雅的退出, 所以stop()
方法没有被定义为抽象。服务器
TSimpleServer
的工做模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,可是一次只能接收和处理一个socket
链接,效率比较低。它主要用于演示Thrift
的工做过程,在实际开发过程当中不多用到它。网络
服务端:多线程
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 简单的单线程服务模型 通常用于测试
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
复制代码
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
复制代码
查看上述流程的源代码,即TSimpleServer.java
中的serve()
方法以下:
serve()
方法的操做:
TServerSocket
的listen()
方法启动链接监听。TTransport
对象。TServerEventHandler
对象处理具体的业务请求。TThreadPoolServer
模式采用阻塞socket
方式工做,主线程负责阻塞式监听是否有新socket
到来,具体的业务处理交由一个线程池来处理。
服务端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 线程池服务模型 使用标准的阻塞式IO 预先建立一组线程处理请求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
复制代码
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
复制代码
ThreadPoolServer
解决了TSimpleServer
不支持并发和多链接的问题,引入了线程池。实现的模型是One Thread Per Connection
。查看上述流程的源代码,先查看线程池的代码片断:
TThreadPoolServer.java
中的serve()
方法以下:
serve()
方法的操做:
TServerSocket
的listen()
方法启动链接监听。WorkerProcess
对象(WorkerProcess
实现了Runnabel
接口),并提交到线程池。WorkerProcess
的run()
方法负责业务处理,为客户端建立了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。TServerEventHandler
对象处理具体的业务请求。WorkerProcess
的run()
方法:
拆分了监听线程(Accept Thread
)和处理客户端链接的工做线程(Worker Thread
),数据读取和业务处理都交给线程池处理。所以在并发量较大时新链接也可以被及时接受。
线程池模式比较适合服务器端能预知最多有多少个客户端并发的状况,这时每一个请求都能被业务线程池及时处理,性能也很是高。
线程池模式的处理能力受限于线程池的工做能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
TNonblockingServer
模式也是单线程工做,可是采用NIO
的模式,借助Channel/Selector
机制, 采用IO
事件模型来处理。
全部的socket
都被注册到selector
中,在一个线程中经过seletor
循环监控全部的socket
。
每次selector
循环结束时,处理全部的处于就绪状态的socket
,对于有数据到来的socket
进行数据读取操做,对于有数据发送的socket则进行数据发送操做,对于监听socket
则产生一个新业务socket
并将其注册到selector
上。
注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。
服务端:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服务端和客户端须要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
复制代码
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
复制代码
TNonblockingServer
继承于AbstractNonblockingServer
,这里咱们更关心基于NIO
的selector
部分的关键代码。
相比于TSimpleServer
效率提高主要体如今IO
多路复用上,TNonblockingServer
采用非阻塞IO
,对accept/read/write
等IO
事件进行监控和处理,同时监控多个socket
的状态变化。
TNonblockingServer
模式在业务处理上仍是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数须要读取数据库执行时间较长,会致使整个服务被阻塞住,此时该模式效率也不高,由于多个调用请求任务依然是顺序一个接一个执行。
鉴于TNonblockingServer
的缺点,THsHaServer
继承于TNonblockingServer
,引入了线程池提升了任务处理的并发能力。THsHaServer
是半同步半异步(Half-Sync/Half-Async
)的处理模式,Half-Aysnc
用于IO
事件处理(Accept/Read/Write
),Half-Sync
用于业务handler
对rpc
的同步处理上。
注意:THsHaServer和TNonblockingServer同样,要求底层的传输通道必须使用TFramedTransport。
服务端:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半异步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
复制代码
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
复制代码
THsHaServer
继承于TNonblockingServer
,新增了线程池并发处理工做任务的功能,查看线程池的相关代码:
任务线程池的建立过程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。
THsHaServer
与TNonblockingServer
模式相比,THsHaServer
在完成数据读取以后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操做,效率大大提高。
主线程仍然须要完成全部socket
的监听接收、数据读取和数据写入操做。当并发请求数较大时,且发送数据量较多时,监听socket
上新链接请求不能被及时接受。
TThreadedSelectorServer
是对THsHaServer
的一种扩充,它将selector
中的读写IO
事件(read/write
)从主线程中分离出来。同时引入worker
工做线程池,它也是种Half-Sync/Half-Async
的服务模型。
TThreadedSelectorServer
模式是目前Thrift
提供的最高级的线程服务模型,它内部有若是几个部分构成:
AcceptThread
线程对象,专门用于处理监听socket
上的新链接。SelectorThread
对象专门用于处理业务socket
的网络I/O
读写操做,全部网络数据的读写均是有这些线程来完成。SelectorThreadLoadBalancer
对象,主要用于AcceptThread
线程接收到一个新socket
链接请求时,决定将这个新链接请求分配给哪一个SelectorThread
线程。ExecutorService
类型的工做线程池,在SelectorThread
线程中,监听到有业务socket
中有调用请求过来,则将请求数据读取以后,交给ExecutorService
线程池中的线程完成这次调用的具体执行。主要用于处理每一个rpc
请求的handler
回调处理(这部分是同步的)。服务端:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多线程半同步半异步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO时 服务端和客户端都须要指定数据传输方式为TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多线程半同步半异步的服务模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
复制代码
客户端:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 设置传输通道 对于非阻塞服务 须要使用TFramedTransport(用于将数据分块发送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭传输通道
transport.close();
}
}
}
}.start();
}
复制代码
以上工做流程的三个组件AcceptThread
、SelectorThread
和ExecutorService
在源码中的定义以下:
TThreadedSelectorServer
模式中有一个专门的线程AcceptThread
用于处理新链接请求,所以可以及时响应大量并发链接请求;另外它将网络I/O操做分散到多个SelectorThread
线程中来完成,所以可以快速对网络I/O
进行读写操做,可以很好地应对网络I/O
较多的状况。
TThreadedSelectorServer
默认参数定义以下:
建立、初始化并启动AcceptThread
和SelectorThreads
,同时启动selector
线程的负载均衡器(selectorThreads
)。
AcceptThread
继承于Thread
,能够看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport
)、NIO
选择器(acceptSelector
)和选择器线程负载均衡器(threadChooser
)。
查看AcceptThread
的run()
方法,能够看出accept
线程一旦启动,就会不停地调用select()
方法:
查看select()
方法,acceptSelector
选择器等待IO
事件的到来,拿到SelectionKey
即检查是否是accept
事件。若是是,经过handleAccept()
方法接收一个新来的链接;不然,若是是IO
读写事件,AcceptThread
不做任何处理,交由SelectorThread
完成。
在handleAccept()
方法中,先经过doAccept()
去拿链接通道,而后Selector
线程负载均衡器选择一个Selector
线程,完成接下来的IO
读写事件。
接下来继续查看doAddAccept()
方法的实现,毫无悬念,它进一步调用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞传输通道对象传递给选择器线程作进一步的IO
读写操做。
SelectorThreadLoadBalancer
如何建立?
SelectorThreadLoadBalancer
是一个基于轮询算法的Selector
线程选择器,经过线程迭代器为新进来的链接顺序分配SelectorThread
。
SelectorThread
和AcceptThread
同样,是TThreadedSelectorServer
的一个成员内部类,每一个SelectorThread
线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的链接通道。
阻塞队列的大小可由构造函数指定:
上面看到,在AcceptThread
的doAddAccept()
方法中调用了SelectorThread
的addAcceptedConnection()
方法。
这个方法作了两件事:
SelectorThread
线程接收的链接通道放入阻塞队列中。wakeup()
方法唤醒SelectorThread
中的NIO
选择器selector
。既然SelectorThread
也是继承于Thread
,查看其run()
方法的实现:
SelectorThread
方法的select()
监听IO
事件,仅仅用于处理数据读取和数据写入。若是链接有数据可读,读取并以frame
的方式缓存;若是须要向链接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,须要向NIO
的selector
清空和注销自身的SelectionKey
。
rpc
调用过程也就结束了,handleWrite()
方法以下:Thrift
会利用已读数据执行目标方法,handleRead()
方法以下:handleRead
方法在执行read()
方法,将数据读取完成后,会调用requestInvoke()
方法调用目标方法完成具体业务处理。requestInvoke()
方法将请求数据封装为一个Runnable
对象,提交到工做任务线程池(ExecutorService
)进行处理。
select()
方法完成后,线程继续运行processAcceptedConnections()
方法处理下一个链接的IO
事件。
这里比较核心的几个操做:
SelectorThread
的阻塞队列acceptedQueue
中获取一个链接的传输通道。若是获取成功,调用registerAccepted()
方法;不然,进入下一次循环。registerAccepted()
方法将传输通道底层的链接注册到NIO
的选择器selector
上面,获取到一个SelectionKey
。FrameBuffer
对象,并绑定到获取的SelectionKey
上面,用于数据传输时的中间读写缓存。本文对Thrift
的各类线程服务模型进行了介绍,包括2种阻塞式服务模型:TSimpleServer
、TThreadPoolServer
,3种非阻塞式服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。对各类服务模型的具体用法、工做流程、原理和源码实现进行了必定程度的分析。
鉴于篇幅较长,请各位看官请慢慢批阅!
欢迎关注技术公众号: 零壹技术栈
本账号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。