Thrift 简易入门与实战

简介

thrift是一个软件框架, 用来进行可扩展且跨语言的服务的开发. 它结合了功能强大的软件堆栈和代码生成引擎, 以构建在 C++, Java, Go,Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务.
官网地址: thrift.apache.orgjava

安装

Thrift 的安装比较简单, 在 Mac 下能够直接使用 brew 快速安装.git

brew install thrift

Window 或 Linux 能够经过官网 下载, 这里就再也不多说了.github

当下载安装完毕后, 咱们就会获得一个名为 thrift (Window 下是 thrift.exe) 的工具, 经过它就能够生成各个语言的 thrift 代码.apache

基础

数据类型

Thrift 脚本可定义的数据类型包括如下几种类型:编程

基本类型

  • bool: 布尔值, true 或 false, 对应 Java 的 boolean服务器

  • byte: 8 位有符号整数, 对应 Java 的 byte网络

  • i16: 16 位有符号整数, 对应 Java 的 short数据结构

  • i32: 32 位有符号整数, 对应 Java 的 int多线程

  • i64: 64 位有符号整数, 对应 Java 的 long并发

  • double: 64 位浮点数, 对应 Java 的 double

  • string: 未知编码文本或二进制字符串, 对应 Java 的 String

struct 类型

定义公共的对象, 相似于 C 语言中的结构体定义, 在 Java 中是一个 JavaBean

union 类型

和 C/C++ 中的 union 相似.

容器类型:

  • list: 对应 Java 的 ArrayList

  • set: 对应 Java 的 HashSet

  • map: 对应 Java 的 HashMap

exception 类型

对应 Java 的 Exception

service 类型

对应服务的类.

service 类型能够被继承, 例如:

service PeopleDirectory {
   oneway void log(1: string message),
   void reloadDatabase()
}
service EmployeeDirectory extends PeopleDirectory {
   Employee findEmployee(1: i32employee_id) throws (1: MyError error),
   bool createEmployee(1: Employee new_employee)
}

注意到, 在定义 PeopleDirectory 服务的 log 方法时, 咱们使用到了 oneway 关键字, 这个关键字的做用是告诉 thrift, 咱们不关心函数的返回值, 不须要等待函数执行完毕就能够直接返回.
oneway 关键字一般用于修饰无返回值(void)的函数, 可是它和直接的无返回值的函数仍是有区别的, 例如上面的 log 函数和 reloadDatabase 函数, 当客户端经过 thrift 进行远程调用服务端的 log 函数时, 不须要等待服务端的 log 函数执行结束就能够直接返回; 可是当客户端调用 reloadDatabase 方法时, 虽然这个方法也是无返回值的, 但客户端必需要阻塞等待, 直到服务端通知客户端此调用已结束后, 客户端的远程调用才能够返回.

枚举类型

和 Java 中的 enum 类型同样, 例如:

enum Fruit {
    Apple,
    Banana,
}

例子

下面是一个在 IDL 文件中使用各类类型的例子:

enum ResponseStatus {
  OK = 0,
  ERROR = 1,
}

struct ListResponse {
  1: required ResponseStatus status,
  2: optional list<i32> ids,
  3: optional list<double> scores,
  10: optional string strategy,
  11: optional string globalId,
  12: optional map<string, string> extraInfo,
}

service Hello {
    string helloString(1:string para)
    i32 helloInt(1:i32 para)
    bool helloBoolean(1:bool para)
    void helloVoid()
    string helloNull()
}

关于 IDL 文件

所谓 IDL, 即 接口描述语言, 在使用 thrift 前, 须要提供一个 .thrift 后缀的文件, 其内容是使用 IDL 描述的服务接口信息.
例如以下的一个接口描述:

namespace java com.xys.thrift

service HelloWorldService {
    string sayHello(string name);
}

这里咱们定义了一个名为 HelloWorldService 的接口, 它有一个方法, 即 sayHello. 当经过 thrift --gen java test.thrift 来生成 thrift 接口服务时, 会产生一个 HelloWorldService.java 的文件, 在此文件中会定义一个 HelloWorldService.Iface 接口, 咱们在服务器端实现此接口便可.

服务器端编码基本步骤

  • 实现服务处理接口 impl

  • 建立 Processor

  • 建立 Transport

  • 建立 Protocol

  • 建立 Server

  • 启动 Server

例如:

public class HelloServer {
    public static final int SERVER_PORT = 8080;

    public static void main(String[] args) throws Exception {
        HelloServer server = new HelloServer();
        server.startServer();
    }

    public void startServer() throws Exception {
        // 建立 TProcessor
        TProcessor tprocessor = 
                new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldImpl());

        // 建立 TServerTransport, TServerSocket 继承于 TServerTransport
        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
        
        // 建立 TProtocol
        TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
        
        TServer.Args tArgs = new TServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.protocolFactory(protocolFactory);

        // 建立 TServer
        TServer server = new TSimpleServer(tArgs);
        // 启动 Server
        server.serve();
    }
}

客户端编码基本步骤

  • 建立 Transport

  • 建立 Protocol

  • 基于 Potocol 建立 Client

  • 打开 Transport

  • 调用服务相应的方法.

public class HelloClient {
    public static final String SERVER_IP = "localhost";
    public static final int SERVER_PORT = 8080;
    public static final int TIMEOUT = 30000;

    public static void main(String[] args) throws Exception {
        HelloClient client = new HelloClient();
        client.startClient("XYS");
    }

    public void startClient(String userName) throws Exception {
        // 建立 TTransport
        TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
        // 建立 TProtocol
        TProtocol protocol = new TBinaryProtocol(transport);

        // 建立客户端.
        HelloWorldService.Client client = new HelloWorldService.Client(protocol);
        
        // 打开 TTransport
        transport.open();
        
        // 调用服务方法
        String result = client.sayHello(userName);
        System.out.println("Result: " + result);

        transport.close();
    }
}

Thrift 的网络栈

clipboard.png

如上图所示, thrift 的网络栈包含了 transport 层, protocol 层, processor 层和 Server/Client 层.

Transport 层

Transport 层提供了从网络中读取数据或将数据写入网络的抽象.
Transport 层和 Protocol 层相互独立, 咱们能够根据本身须要选择不一样的 Transport 层, 而对上层的逻辑不形成任何影响.

Thrift 的 Java 实现中, 咱们使用接口 TTransport 来描述传输层对象, 这个接口提供的经常使用方法有:

open
close
read
write
flush

而在服务器端, 咱们一般会使用 TServerTransport 来监听客户端的请求, 并生成相对应的 Transport 对象, 这个接口提供的经常使用方法有:

open
listen
accept
close

为了使用上的方便, Thrift 提供了以下几个经常使用 Transport:

  • TSocket: 这个 transport 使用阻塞 socket 来收发数据.

  • TFramedTransport: 以帧的形式发送数据, 每帧前面是一个长度. 当服务方使用 non-blocking IO 时(即服务器端使用的是 TNonblockingServerSocket), 那么就必须使用 TFramedTransport.

  • TMemoryTransport: 使用内存 I/O. Java 实现中在内部使用了 ByteArrayOutputStream

  • TZlibTransport: 使用 Zlib 压缩传输的数据. 在 Java 中未实现.

Protocol 层(数据传输协议层)

这一层的做用是内存中的数据结构转换为可经过 Transport 传输的数据流或者反操做, 即咱们所谓的 序列化反序列化.

经常使用的协议有:

  • TBinaryProtocol, 二进制格式

  • TCompactProtocol, 压缩格式

  • TJSONProtocol, JSON 格式

  • TSimpleJSONProtocol, 提供 JSON 只写协议, 生成的文件很容易经过脚本语言解析.

  • TDebugProtocoal, 使用人类可读的 Text 格式, 帮助调试

注意, 客户端和服务器的协议要同样.

Processor 层

Processor 层对象由 Thrift 根据用户的 IDL 文件所生成, 咱们一般不能随意指定.
这一层主要有两个功能:

  • 从 Protocol 层读取数据, 而后转交给对应的 handler 处理

  • 将 handler 处理的结构发送 Prootcol 层.

Server 层

Thrift 提供的 Server 层实现有:

  • TNonblockingServer: 这个是一个基于多线程, 非阻塞 IO 的 Server 层实现, 它专门用于处理大量的并发请求的

  • THsHaServer: 办同步/半异步服务器模型, 基于 TNonblockingServer 实现.

  • TThreadPoolServer: 基于多线程, 阻塞 IO 的 Server 层实现, 它所消耗的系统资源比 TNonblockingServer 高, 不过能够提供更高的吞吐量.

  • TSimpleServer: 这个实现主要是用于测试目的. 它只有一个线程, 而且是阻塞 IO, 所以在同一时间只能处理一个链接.

使用例子

下面的例子在个人 Github 上有源码, 直接 clone 便可.

依赖

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.10.0</version>
</dependency>

thrift 版本: 0.10.0
注意, jar 包的版本须要和 thrift 版本一致, 否则可能会有一些编译错误

thrift 文件

test.thrift

namespace java com.xys.thrift

service HelloWorldService {
    string sayHello(string name);
}

编译

cd src/main/resources/
thrift --gen java test.thrift
mv gen-java/com/xys/thrift/HelloWorldService.java ../java/com/xys/thrift

当执行 thrift --gen java test.thrift 命令后, 会在当前目录下生成一个 gen-java 目录, 其中会以包路径格式存放着生成的服务器端 thrift 代码, 咱们将其拷贝到工程对应的目录下便可.

服务实现

public class HelloWorldImpl implements HelloWorldService.Iface {
    public HelloWorldImpl() {
    }

    @Override
    public String sayHello(String name) throws TException {
        return "Hello, " + name;
    }
}

服务端/客户端实现

下面咱们分别根据服务器端的几种不一样类型, 来分别实现它们, 并对比这些模型的异同点.

TSimpleServer 服务器模型

TSimpleServer 是一个简单的服务器端模型, 它只有一个线程, 而且使用的是阻塞 IO 模型, 所以通常用于测试环境中.

服务器端实现
public class SimpleHelloServer {
    public static final int SERVER_PORT = 8080;

    public static void main(String[] args) throws Exception {
        SimpleHelloServer server = new SimpleHelloServer();
        server.startServer();
    }

    public void startServer() throws Exception {
        TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(
                new HelloWorldImpl());

        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
        TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.protocolFactory(new TBinaryProtocol.Factory());

        TServer server = new TSimpleServer(tArgs);

        server.serve();
    }
}

咱们在服务器端的代码中, 没有显示地指定 Transport 的类型, 这个是由于 TSimpleServer.Args 在构造时, 会指定一个默认的 TransportFactory, 当有新的客户端链接时, 就会生成一个 TSocket 的 Transport 实例. 因为这一点, 咱们在客户端实现时, 也就须要指定客户端的 Transport 为 TSocket 才行.

客户端实现
public class SimpleHelloClient {
    public static final String SERVER_IP = "localhost";
    public static final int SERVER_PORT = 8080;
    public static final int TIMEOUT = 30000;

    public void startClient(String userName) throws Exception {
        TTransport transport = null;

        transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
        // 协议要和服务端一致
        TProtocol protocol = new TBinaryProtocol(transport);
        HelloWorldService.Client client = new HelloWorldService.Client(
                protocol);
        transport.open();
        String result = client.sayHello(userName);
        System.out.println("Result: " + result);

        transport.close();
    }

    public static void main(String[] args) throws Exception {
        SimpleHelloClient client = new SimpleHelloClient();
        client.startClient("XYS");
    }
}

TThreadPoolServer 服务器模型

TThreadPoolServer 是一个基于线程池和传统的阻塞 IO 模型实现, 每一个线程对应着一个链接.

服务器端实现
public class ThreadPoolHelloServer {
    public static final int SERVER_PORT = 8080;

    public static void main(String[] args) throws Exception {
        ThreadPoolHelloServer server = new ThreadPoolHelloServer();
        server.startServer();
    }

    public void startServer() throws Exception {
        TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(
                new HelloWorldImpl());

        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
        TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.protocolFactory(new TBinaryProtocol.Factory());

        TServer server = new TThreadPoolServer(tArgs);
        server.serve();
    }
}

TThreadPoolServer 的服务器端实现和 TSimpleServer 的没有很大区别, 只不过是在对应的地方把 TSimpleServer 改成 TThreadPoolServer 便可.

一样地, 咱们在 TThreadPoolServer 服务器端的代码中, 和 TSimpleServer 同样, 没有显示地指定 Transport 的类型, 这里的缘由和 TSimpleServer 的同样, 就再也不赘述了.

客户端实现

代码实现和 SimpleHelloClient 同样.

TNonblockingServer 服务器模型

TNonblockingServer 是基于线程池的, 而且使用了 Java 提供的 NIO 机制实现非阻塞 IO, 这个模型能够并发处理大量的客户端链接.
注意, 当使用 TNonblockingServer 模型是, 服务器端和客户端的 Transport 层须要指定为 TFramedTransportTFastFramedTransport.

服务器端实现
public class NonblockingHelloServer {
    public static final int SERVER_PORT = 8080;

    public static void main(String[] args) throws Exception {
        NonblockingHelloServer server = new NonblockingHelloServer();
        server.startServer();
    }

    public void startServer() throws Exception {
        TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(
                new HelloWorldImpl());

        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);
        TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.protocolFactory(new TBinaryProtocol.Factory());
        // 下面这个设置 TransportFactory 的语句能够去掉
        tArgs.transportFactory(new TFramedTransport.Factory());

        TServer server = new TNonblockingServer(tArgs);
        server.serve();
    }
}

前面咱们提到, 在 TThreadPoolServerTSimpleServer 的服务器端代码实现中, 咱们并无显示地为服务器端设置 Transport, 由于 TSimpleServer.ArgsTThreadPoolServer.Args 设置了默认的 TransportFactory, 其最终生成的 Transport 是一个 TSocket 实例.

那么在 TNonblockingServer 中又会是怎样的状况呢?
经过查看代码咱们能够发现, TNonblockingServer.Args 构造时, 会调用父类 AbstractNonblockingServerArgs 的构造器, 其源码以下:

public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
    super(transport);
    this.transportFactory(new TFramedTransport.Factory());
}

能够看到, TNonblockingServer.Args 也会设置一个默认的 TransportFactory, 它的类型是 TFramedTransport#Factory, 所以最终 TNonblockingServer 所使用的 Transport 实际上是 TFramedTransport 类型的, 这也就是为何客户端也必须使用 TFramedTransport(或TFastFramedTransport) 类型的 Transport 的缘由了.

分析到这里, 回过头来看代码实现, 咱们就发现其实代码中 tArgs.transportFactory(new TFramedTransport.Factory()) 这一句是多余的, 不过为了强调一下, 我仍是保留了.

客户端实现
public class NonblockingHelloClient {
    public static final String SERVER_IP = "localhost";
    public static final int SERVER_PORT = 8080;
    public static final int TIMEOUT = 30000;

    public void startClient(String userName) throws Exception {
        TTransport transport = null;

        // 客户端使用 TFastFramedTransport 也是能够的.
        transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));
        // 协议要和服务端一致
        TProtocol protocol = new TBinaryProtocol(transport);
        HelloWorldService.Client client = new HelloWorldService.Client(
                protocol);
        transport.open();
        String result = client.sayHello(userName);
        System.out.println("Result: " + result);

        transport.close();
    }

    public static void main(String[] args) throws Exception {
        NonblockingHelloClient client = new NonblockingHelloClient();
        client.startClient("XYS");
    }
}
异步客户端实现

在 TNonblockingServer 服务器模型下, 除了使痛不式的客户端调用方式, 咱们还能够在客户端中使用异步调用的方式, 具体代码以下:

public class NonblockingAsyncHelloClient {
    public static final String SERVER_IP = "localhost";
    public static final int SERVER_PORT = 8080;
    public static final int TIMEOUT = 30000;

    public void startClient(String userName) throws Exception {
        TAsyncClientManager clientManager = new TAsyncClientManager();
        TNonblockingTransport transport = new TNonblockingSocket(SERVER_IP,
                SERVER_PORT, TIMEOUT);

        // 协议要和服务端一致
        TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
        HelloWorldService.AsyncClient client = new HelloWorldService.AsyncClient(
                protocolFactory, clientManager, transport);

        client.sayHello(userName, new AsyncHandler());

        Thread.sleep(500);
    }

    class AsyncHandler implements AsyncMethodCallback<String> {
        @Override
        public void onComplete(String response) {
            System.out.println("Got result: " + response);
        }

        @Override
        public void onError(Exception exception) {
            System.out.println("Got error: " + exception.getMessage());
        }
    }

    public static void main(String[] args) throws Exception {
        NonblockingAsyncHelloClient client = new NonblockingAsyncHelloClient();
        client.startClient("XYS");
    }
}

能够看到, 使用异步的客户端调用方式实现起来就比较复杂了. 和 NonblockingHelloClient 对比, 咱们能够看到有几点不一样:

  • 异步客户端中须要定义一个 TAsyncClientManager 实例, 而同步客户端模式下不须要.

  • 异步客户端 Transport 层使用的是 TNonblockingSocket, 而同步客户端使用的是 TFramedTransport

  • 异步客户端的 Procotol 层对象须要使用 TProtocolFactory 来生成, 而同步客户端须要用户手动生成.

  • 异步客户端的 Client 是 HelloWorldService.AsyncClient, 而同步客户的 Client 是 HelloWorldService.Client

  • 最后也是最关键的不一样点, 异步客户端须要提供一个异步处理 Handler, 用于处理服务器的回复.

咱们再来看一下 AsyncHandler 这个类. 这个类是用于异步回调用的, 当咱们正常收到了服务器的回应后, Thrift 就会自动回调咱们的 onComplete 方法, 所以咱们在这里就能够设置咱们的后续处理逻辑.
当 Thrift 远程调用服务器端出现异常时(例如服务器未启动), 那么就会回调到 onError 方法, 咱们在这个方法中就能够作相应的错误处理.

THsHaServer 服务器模型

Half-Sync/Half-Async, 半同步/半异步服务器模型, 底层的实现其实仍是依赖于 TNonblockingServer, 所以它所须要的 Transport 也是 TFramedTransport.

服务器端实现
public class HsHaHelloServer {
    public static final int SERVER_PORT = 8080;

    public static void main(String[] args) throws Exception {
        HsHaHelloServer server = new HsHaHelloServer();
        server.startServer();
    }

    public void startServer() throws Exception {
        TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(
                new HelloWorldImpl());

        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);
        THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.protocolFactory(new TBinaryProtocol.Factory());

        TServer server = new THsHaServer(tArgs);
        server.serve();
    }
}
客户端实现

和 NonblockingHelloClient 一致.

参考

相关文章
相关标签/搜索