sofa-pbrpc是基于Google Protocol Buffers 实现的RPC网络通讯库,在百度公司各部门获得普遍使用,天天支撑上亿次内部调用。sofa-pbrpc基于百度大搜索高并发高负载的业务场景不断打磨,成为一套简单易用的轻量级高性能RPC框架。2014年sofa-pbrpc正式对外开源受到广大开发人员的关注,目前sofa-pbrpc已经在各大互联网公司产品中使用。c++
开源地址:https://github.com/baidu/sofa-pbrpcgit
主要用户接口分为四个接口类和三个option。 github
Server端配置:RpcServerOptionsweb
参数名 | 参数说明 |
---|---|
work_thread_num | 工做线程数 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入带宽限制 (MB/s) |
max_throughput_out | 最大出带宽限制 (MB/s) |
keep_alive_time | 空闲链接维持时间 (s) |
Client端配置:RpcClientOptionsjson
参数名 | 参数说明 |
---|---|
work_thread_num | 工做线程数 |
callback_thread_num | 回调线程数 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入带宽限制 (MB/s) |
max_throughput_out | 最大出带宽限制 (MB/s) |
使用sofa-pbrpc只须要三步:网络
样例代码参见sample/echo。并发
定义协议只须要编写一个proto文件便可。 范例:echo_service.proto负载均衡
package sofa.pbrpc.test; option cc_generic_services = true; message EchoRequest { required string message = 1; } message EchoResponse { required string message = 1; } service EchoServer { rpc Echo(EchoRequest) returns(EchoResponse); }
使用protoc编译'echo_service.proto',生成接口文件'echo_service.pb.h'和'echo_service.pb.cc'。框架
注意:异步
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc头文件 #include "echo_service.pb.h" // service接口定义头文件
class EchoServerImpl : public sofa::pbrpc::test::EchoServer { public: EchoServerImpl() {} virtual ~EchoServerImpl() {} private: virtual void Echo(google::protobuf::RpcController* controller, const sofa::pbrpc::test::EchoRequest* request, sofa::pbrpc::test::EchoResponse* response, google::protobuf::Closure* done) { sofa::pbrpc::RpcController* cntl = static_cast<sofa::pbrpc::RpcController*>(controller); SLOG(NOTICE, "Echo(): request message from %s: %s", cntl->RemoteAddress().c_str(), request->message().c_str()); response->set_message("echo message: " + request->message()); done->Run(); } };
注意:
int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcServerOptions options; options.work_thread_num = 8; sofa::pbrpc::RpcServer rpc_server(options); if (!rpc_server.Start("0.0.0.0:12321")) { SLOG(ERROR, "start server failed"); return EXIT_FAILURE; } sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl(); if (!rpc_server.RegisterService(echo_service)) { SLOG(ERROR, "register service failed"); return EXIT_FAILURE; } rpc_server.Run(); rpc_server.Stop(); return EXIT_SUCCESS; }
Client支持同步和异步两种调用方式:
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc头文件 #include "echo_service.pb.h" // service接口定义头文件
int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcClientOptions client_options; client_options.work_thread_num = 8; sofa::pbrpc::RpcClient rpc_client(client_options); sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321"); sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel); sofa::pbrpc::test::EchoRequest request; request.set_message("Hello world!"); sofa::pbrpc::test::EchoResponse response; sofa::pbrpc::RpcController controller; controller.SetTimeout(3000); stub.Echo(&controller, &request, &response, NULL); if (controller.Failed()) { SLOG(ERROR, "request failed: %s", controller.ErrorText().c_str()); } return EXIT_SUCCESS; }
void EchoCallback(sofa::pbrpc::RpcController* cntl, sofa::pbrpc::test::EchoRequest* request, sofa::pbrpc::test::EchoResponse* response, bool* callbacked) { SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str()); SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false"); if (cntl->IsRequestSent()) { SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str()); SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes()); } if (cntl->Failed()) { SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str()); } else { SLOG(NOTICE, "request succeed: %s", response->message().c_str()); } delete cntl; delete request; delete response; *callbacked = true; } int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcClientOptions client_options; sofa::pbrpc::RpcClient rpc_client(client_options); sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321"); sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel); sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest(); request->set_message("Hello from qinzuoyan01"); sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse(); sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController(); cntl->SetTimeout(3000); bool callbacked = false; google::protobuf::Closure* done = sofa::pbrpc::NewClosure( &EchoCallback, cntl, request, response, &callbacked); stub.Echo(cntl, request, response, done); while (!callbacked) { usleep(100000); } return EXIT_SUCCESS; }
注意:
在sofa-pbrpc中网络数据自上而下流划分为RpcClientStream/RpcServerStream、RpcMessageStream、RpcByteStream三层。字节流层主要负责网络通讯相关的操做,操做对象为序列化后的二机制字节流;消息流层处理的对象是由header、meta和data组装的消息,负责消息级别的控制与统计;协议层负责异步发送请求和接收响应数据。采用这样协议栈方式的层次划分更加有利于数据协议的扩展。
一条rpc消息由RpcMessageHeader、RpcMeta和Data组成。
struct RpcMessageHeader { union { char magic_str[4]; uint32 magic_str_value; }; // 4 bytes int32 meta_size; // 4 bytes int64 data_size; // 8 bytes int64 message_size; // 8 bytes: message_size = meta_size + data_size, for check RpcMessageHeader() : magic_str_value(SOFA_RPC_MAGIC) , meta_size(0), data_size(0), message_size(0) {} bool CheckMagicString() const { return magic_str_value == SOFA_RPC_MAGIC; } };
message RpcMeta { enum Type { REQUEST = 0; RESPONSE = 1; }; required Type type = 1; required uint64 sequence_id = 2; optional string method = 100; optional int64 server_timeout = 101; optional bool failed = 200; optional int32 error_code = 201; optional string reason = 202; optional CompressType compress_type = 300; optional CompressType expected_response_compress_type = 301; }
一次RPC调用通过如下流程:
asio异步模型,底层使用epoll。
sofa-pbrpc将内存划分为固定大小的buffer做为缓冲区,对buffer采用引用计数进行管理,减小没必要要的内存拷贝。
采用装饰者模式的透明压缩,易于扩展。
使用lock+swap操做缩小临界区。
按时间片分配流量配额,保证流控精准高效。
除了使用原生client访问server外,sofa-pbrpc也支持使用http协议访问server上的服务。同时,用户能够经过使用server端的WebService工具类,快速实现server的对于http请求的处理逻辑。
sofa-pbrpc支持用户使用http客户端向server发送json格式的数据请求,并返回json格式的响应。
sofa-pbrpc提供经常使用工具类给开发者,包括:
类别 | 头文件 | 说明 |
---|---|---|
智能指针 | sofa/pbrpc/smart_ptr/smart_ptr.hpp | 包括scoped_ptr,shared_ptr,weak_ptr等 |
原子操做 | sofa/pbrpc/atomic.h | 支持fetch,inc,dec,cas等 |
锁操做 | sofa/pbrpc/locks.h | 提供了互斥锁,自旋锁,读写锁的封装 |
定时管理 | sofa/pbrpc/timeout_manager.h | 高效的提供了定时器功能 |
百度网页搜索部开源团队 opensearch@baidu.com