C++多进程并发框架

三年来一直从事服务器程序开发,一直都是忙忙碌碌,不久前结束了职业生涯的第一份工做,有了一个礼拜的休息时间,终于能够写写总结了。因而把之前的开源代码作了整理和优化,这就是FFLIB。虽然这边总结看起来像日记,有不少废话,可是此文仍然是有很大针对性的。针对服务器开发中常见的问题,如多线程并发、消息转发、异步、性能优化、单元测试,提出本身的看法。 html

面对的问题

从事开发工程中,遇到过很多问题,不少时候因为时间紧迫,没有使用优雅的方案。在跟业内的一些朋友交流过程当中,我也意识到有些问题是你们都存在的。简单列举以下: c++

  • 多线程与并发
  • 异步消息/接口调用
  • 消息的序列化与Reflection
  • 性能优化
  • 单元测试

多线程与并发

如今是多核时代,并发才能实现更高的吞吐量、更快的响应,但也是把双刃剑。总结以下几个用法: web

    • 多线程+显示锁;接口是被多线程调用的,当被调用时,显示加锁,再操做实体数据。悲剧的是,工程师为了优化会设计多个锁,以减小锁的粒度,甚至有些地方使用了原子操做。这些都为领域逻辑增长了额外的设计负担。最坏的状况是会出现死锁。

  • 多线程+任务队列;接口被多线程调用,但请求会被暂存到任务队列,而任务队列会被单线程不断执行,典型生产者消费者模式。它的并发在于不一样的接口可使用不一样的任务队列。这也是我最经常使用的并发方式。

这是两种最多见的多线程并发,它们有个天生的缺陷——Scalability。一个机器的性能老是有瓶颈的。两个场景的逻辑虽然由多个线程实现了并发,可是运算量十分有多是一台机器没法承载的。若是是多进程并发,那么能够分布式把其部署到其余机器(也可部署在一台机器)。因此多进程并发比多线程并发更加Scalability。另外采用多进程后,每一个进程单线程设计,这样的程序更加Simplicity。多进程的其余优势如解耦、模块化、方便调试、方便重用等就不赘言了。 json

异步消息/接口调用

提到分布式,就要说一下分布式的通信技术。经常使用的方式以下: 性能优化

  • 类RPC;包括WebService、RPC、ICE等,特色是远程同步调用。远程的接口和本地的接口很是类似。可是游戏服务器程序通常很是在乎延迟和吞吐量,因此这些阻塞线程的同步远程调用方式并不经常使用。可是咱们必须意识到他的优势,就是很是利于调用和测试。
  • 全异步消息;当调用远程接口的时候,异步发送请求消息,接口响应后返回一个结果消息,调用方的回调函数处理结果消息继续逻辑操做。因此有些逻辑就会被切割成ServiceStart和ServiceCallback两段。有时异步会讲领域逻辑变得支离破碎。另外消息处理函数中通常会写一坨的switch/case 处理不一样的消息。最大的问题在于单元测试,这种状况传统单元测试根本一筹莫展。

消息的序列化与Reflection

实现消息的序列化和反序列化的方式有不少,常见的有Struct、json、Protobuff等都有很成功的应用。我我的倾向于使用轻量级的二进制序列化,优势是比较透明和高效,一切在掌握之中。在FFLIB 中实现了bin_encoder_t 和 bin_decoder_t 轻量级的消息序列化,几十行代码而已。 服务器

性能优化

已经写过关于性能方面的总结,参见 网络

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html 多线程

有的网友提到profiler、cpuprofiler、callgrind等工具。这些工具我都使用过,说实话,对于我来讲,我太认同它有很高的价值。第一他们只能用于开发测试阶段,能够初步获得一些性能上参考数据。第二它们如何实现跟踪人们无从得知。运行其会使程序变慢,不能反映真实数据。第三重要的是,开发测试阶段性能和上线后的能同样吗?Impossible ! 架构

关于性能,原则就是数听说话,详见博文,不在赘述。 并发

单元测试

关于单元测试,前边已经谈论了一些。游戏服务器程序通常都比较庞大,可是难以想象的是,鄙人历来没见有项目(c++ 后台架构的)有完整单元测试的。因为存在着异步和多线程,传统的单元测试框架没法胜任,而开发支持异步的测试框架又是不现实的。咱们必须看到的是,传统的单元测试框架已经取得了很是大的成功。据我了解,使用web 架构的游戏后台已经对于单元测试的使用已经很是成熟,取得了极其好的效果。因此个人思路是利用现有的单元测试框架,将异步消息、多线程的架构作出调整。

已经屡次谈论单元测试了。其实在开发FFLIB的思路很大程度来源于此,不然可能只是一个c++ 网络库而已。我决定尝试去解决这个问题的时候,把FFLIB 定位于框架。

先来看一段很是简单的单元测试的代码 :

Assert(2 == Add(1, 1));

请容许我对这行代码作些解释,对Add函数输入参数,验证返回值是不是预期的结果。这不就是单元测试的本质吗?在想一下咱们异步发送消息的过程,若是每一个输入消息约定一个结果消息包,每次发送请求时都绑定一个回调函数接收和验证结果消息包。这样的话就偏偏知足了传统单元测试的步骤了。最后还需解决一个问题,Assert是不能处理异步的返回值的。幸运的是,future机制能够化异步为同步。不了解future 模式的能够参考这里:

http://blog.chinaunix.net/uid-23093301-id-190969.html

http://msdn.microsoft.com/zh-cn/library/dd764564.aspx#Y300

来看一下在FFLIB框架下远程调用echo 服务的示例:

struct lambda_t
{
  static void callback(echo_t::out_t& msg_)
  {
    echo_t::in_t in;
    in.value = "XXX_echo_test_XXX";
    singleton_t<msg_bus_t>::instance()
       .get_service_group("echo")
       ->get_service(1)->async_call(in, &lambda_t::callback);
  }
};
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
singleton_t<msg_bus_t>::instance().get_service_group("echo")->get_service(1)->async_call(in, &lambda_t::callback);

当须要调用远程接口时,async_call(in, &lambda_t::callback); 异步调用必须绑定一个回调函数,回调函数接收结果消息,能够触发后续操做。这样的话,若是对echo 的远程接口作单元测试,能够这样作:

rpc_future_t< echo_t::out_t> rpc_future;
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
const echo_t::out_t& out = rpc_future.call(
    singleton_t<msg_bus_t>::instance()
        .get_service_group("echo")->get_service(1), in);
Assert(in.value == out.value);
这样全部的远程接口均可以被单元测试覆盖。

FFLIB 介绍

 FFLIB 结构图

如图所示,Client 不会直接和Service 相链接,而是经过Broker 中间层完成了消息传递。关于Broker 模式能够参见:http://blog.chinaunix.net/uid-23093301-id-90459.html

进程间通讯采用TPC,而不是多线程使用的共享内存方式。Service 通常是单线程架构的,经过启动多进程实现相对于多线程的并发。因为Broker模式天生石分布式的,因此有很好的Scalability。

消息时序图

如何注册服务和接口

来看一下Echo 服务的实现:

 
 
struct echo_service_t
{
public:
    void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)
    {
        logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
        echo_t::out_t out;
        out.value = in_msg_.value;
        cb_(out);
    }
};

int main(int argc, char* argv[])
{
    int g_index = 1;
    if (argc > 1)
    {
        g_index = atoi(argv[1]);
    }
    char buff[128];
    snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");

    msg_bus_t msg_bus;
    assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");

    echo_service_t f;

    singleton_t<msg_bus_t>::instance().create_service_group("echo");
    singleton_t<msg_bus_t>::instance().create_service("echo", g_index)
            .bind_service(&f)
            .reg(&echo_service_t::echo);

    signal_helper_t::wait();

    singleton_t<msg_bus_t>::instance().close();
    //usleep(1000);
    cout <<"\noh end\n";
    return 0;
}
 
    • create_service_group 建立一个服务group,一个服务组可能有多个并行的实例

 

  • create_service 以特定的id 建立一个服务实例

 

 

  • reg 为该服务注册接口

 

 

  • 接口的定义规范为void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),第一个参数为输入的消息struct,第二个参数为回调函数的模板特例,模板参数为返回消息的struct 类型。接口无需知道发送消息等细节,只需将结果callback 便可。

 

 

  • 注册到Broker 后,全部Client均可获取该服务

 

 

 

消息定义的规范

咱们约定每一个接口(远程或本地都应知足)都包含一个输入消息和一个结果消息。来看一下echo 服务的消息定义:

 
 
struct echo_t
{
    struct in_t: public msg_i
    {
        in_t():
            msg_i("echo_t::in_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }

        string value;
    };
    struct out_t: public msg_i
    {
        out_t():
            msg_i("echo_t::out_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }

        string value;
    };
};
  •  每一个接口必须包含in_t消息和out_t消息,而且他们定义在接口名(如echo _t)的内部
  • 全部消息都继承于msg_i, 其封装了二进制的序列化、反序列化等。构造时赋予类型名做为消息的名称。
  • 每一个消息必须实现encode 和 decode 函数

这里须要指出的是,FFLIB 中不须要为每一个消息定义对应的CMD。当接口如echo向Broker 注册时,reg接口经过C++ 模板的类型推断会自动将该msg name 注册给Broker, Broker为每一个msg name 分配惟一的msg_id。Msg_bus 中自动维护了msg_name 和msg_id 的映射。Msg_i 的定义以下:

struct msg_i : public codec_i
{
    msg_i(const char* msg_name_):
        cmd(0),
        uuid(0),
        service_group_id(0),
        service_id(0),
        msg_id(0),
        msg_name(msg_name_)
    {}

    void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
    {
        service_group_id = group_id;
        service_id       = id_;
        uuid             = uuid_;
        msg_id           = msg_id_;
    }

    uint16_t cmd;
    uint16_t get_group_id()   const{ return service_group_id; }
    uint16_t get_service_id() const{ return service_id;       }
    uint32_t get_uuid()       const{ return uuid;             }

    uint16_t get_msg_id()     const{ return msg_id;           }
    const string& get_name()  const
    {
        if (msg_name.empty() == false)
        {
            return msg_name;
        }
        return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());
    }

    void     set_uuid(uint32_t id_)   { uuid = id_;  }
    void     set_msg_id(uint16_t id_) { msg_id = id_;}
    void     set_sgid(uint16_t sgid_) { service_group_id = sgid_;}
    void     set_sid(uint16_t sid_)   { service_id = sid_; }
    uint32_t uuid;
    uint16_t service_group_id;
    uint16_t service_id;
    uint16_t msg_id;
    string   msg_name;

    virtual string encode(uint16_t cmd_)
    {
        this->cmd = cmd_;
        return encode();
    }
    virtual string encode() = 0;
    bin_encoder_t& init_encoder()
    {
        return encoder.init(cmd)  << uuid << service_group_id << service_id<< msg_id;
    }
    bin_encoder_t& init_encoder(uint16_t cmd_)
    {
        return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;
    }
    bin_decoder_t& init_decoder(const string& buff_)
    {
        return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;
    }
    bin_decoder_t decoder;
    bin_encoder_t encoder;
};

关于性能

因为远程接口的调用必须经过Broker, Broker会为每一个接口自动生成性能统计数据,并每10分钟输出到perf.txt 文件中。文件格式为CSV,参见:

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html

总结

FFLIB框架拥有以下的特色:

  • 使用多进程并发。Broker 把Client 和Service 的位置透明化
  • Service 的接口要注册到Broker, 全部链接Broker的Client 均可以调用(publisher/ subscriber)
  • 远程调用必须绑定回调函数
  • 利用future 模式实现同步,从而支持单元测试
  • 消息定义规范简单直接高效
  • 全部service的接口性能监控数据自动生成,免费的午饭
  • Service 单线程话,更simplicity

源代码:

Svn co http://ffown.googlecode.com/svn/trunk/

运行示例:

  • Cd example/broker && make && ./app_broker -l http://127.0.0.1:10241
  • Cd example/echo_server && make && ./app_echo_server
  • Cd example/echo_client && make && ./app_echo_client
相关文章
相关标签/搜索