在 acl_cpp 的非阻塞框架的设计中,充分利用了操做系统平台的高并发机制,同时简化了异步编程的过程。可是,并非全部的操做都是非阻塞的,现实的程序应用中存在着大量的阻塞式行为,acl_cpp 的非阻塞框架中设计了一种经过 ipc 模式使阻塞式函数与 acl_cpp 的非阻塞过程相结合的机制。便是说,在 acl_cpp 的主线程是非阻塞的,而把阻塞过程放在单独的一个线程中运行,当阻塞线程运算完毕,会以 ipc 方式通知主线程运行结果,这样就达到了阻塞与非阻塞相结合的模式。下图展现了 ipc 类的继承关系:ios
以上图中有两个类与 ipc 相关:ipc_server 及 ipc_client,其中 ipc_server 其实用于与监听异步流相关,而ipc_client 与客户端链接流相关。在 ipc_server 类中有一个 open 函数用来打开本地监听地址,另有三个虚函数便于子类进行相关操做:编程
1)on_accept:当监听流得到一个客户端链接时回调此函数,将得到的客户端流传递给子类对象;服务器
2)on_open:当用户调用 ipc_server::open 函数,且监听某一服务地址成功时经过该函数将实际的监听地址传递给子类对象(由于在调用 open(addr) 时,addr 通常仅指定 IP 地址,同时将 端口赋 0 以便于操做系统自动分配本地端口号,因此经过 on_open 即可以将实际的 端口传递给子类对象);并发
3)on_close:当监听流关闭时调用此函数通知子类对象。app
相对于 ipc_server 类,则 ipc_client 的接口就显得比较多,主要的函数以下:框架
1)open:共有四个 open 重载函数用链接监听流服务地址,其中两个是同步流方式,两个是异步流方式,通常同步创建的流用在阻塞式线程中,异步创建的流用在非阻塞线程中;异步
2)send_message:通常用来向非阻塞主线程发送消息;socket
3)on_message:异步接收到阻塞线程发来的消息的回调函数;异步编程
4)append_message:添加异步流想要接收的消息号。函数
ipc_client 类比较特殊,其充当着双重身份:1)做为客户端链接流链接监听流的服务地址,通常用在阻塞式线程中;2)做为监听流接收到来自于客户端流的链接请求而建立的与之对应的服务端异步流,通常用在非阻塞线程中。
下面以一个具体的实例来讲明若是使用 ipc_server 及 ipc_client 两个类:
#include "lib_acl.h" #include <iostream> #include "aio_handle.hpp" #include "ipc_server.hpp" #include "ipc_client.hpp" using namespace acl; #define MSG_REQ 1 #define MSG_RES 2 #define MSG_STOP 3 // 消息客户端链接类定义 class test_client1 : public ipc_client { public: test_client1() { } ~test_client1() { } // 链接消息服务端地址成功后的回调函数 virtual void on_open() { // 添加消息回调对象,接收消息服务器的 // 此类消息 this->append_message(MSG_RES); // 向消息服务器发送请求消息 this->send_message(MSG_REQ, NULL, 0); // 异步等待来自于消息服务器的消息 wait(); } // 流关闭时的回调函数 virtual void on_close() { delete this; } // 接收到消息服务器的消息时的回调函数,其中的消息号由 // append_message 进行注册 virtual void on_message(int nMsg, void*, int) { std::cout << "test_client1 on message:" << nMsg << std::endl; // 向消息服务器发送消息,通知消息服务器中止 this->send_message(MSG_STOP, NULL, 0); // 删除在 on_open 中注册的消息号 this->delete_message(MSG_RES); // 本异步消息过程中止运行 this->get_handle().stop(); // 关闭本异步流对象 this->close(); } protected: private: }; // 消息客户端处理过程 static bool client_main(aio_handle* handle, const char* addr) { // 建立消息链接 ipc_client* ipc = new test_client1(); // 链接消息服务器 if (ipc->open(handle, addr, 0) == false) { std::cout << "open " << addr << " error!" << std::endl; delete ipc; return (false); } return (true); } // 子线程的入口函数 static void* thread_callback(void *ctx) { const char* addr = (const char*) ctx; aio_handle handle; if (client_main(&handle, addr) == false) { handle.check(); return (NULL); } // 子线程的异步消息循环 while (true) { if (handle.check() == false) break; } // 最后清理一些可能未关闭的流链接 handle.check(); return (NULL); }
// 消息服务器接收到的客户端链接流类定义 class test_client2 : public ipc_client { public: test_client2() { } ~test_client2() { } virtual void on_close() { delete this; } // 接收到消息客户端发来消息的回调函数 virtual void on_message(int nMsg, void*, int) { std::cout << "test_client2 on message:" << nMsg << std::endl; // 若是收到消息客户端要求退出的消息,则主线程了退出 if (nMsg == MSG_STOP) { this->close(); // 通知主线程的非阻塞引擎关闭 this->get_handle().stop(); } else // 回应客户端消息 this->send_message(MSG_RES, NULL, 0); } protected: private: }; // 主线程的消息服务器 ipc 服务监听类定义 class test_server : public ipc_server { public: test_server() { } ~test_server() { } // 消息服务器接收到消息客户端链接时的回调函数 void on_accept(aio_socket_stream* client) { // 建立 ipc 链接对象 ipc_client* ipc = new test_client2(); // 打开异步IPC过程 ipc->open(client); // 添加消息回调对象 ipc->append_message(MSG_REQ); ipc->append_message(MSG_STOP); ipc->wait(); } protected: private: }; static void usage(const char* procname) { printf("usage: %s -h[help] -t[use thread]\n", procname); } int main(int argc, char* argv[]) { int ch; bool use_thread = false; while ((ch = getopt(argc, argv, "ht")) > 0) { switch (ch) { case 'h': usage(argv[0]); return (0); case 't': use_thread = true; break; default: break; } } acl_init(); aio_handle handle; ipc_server* server = new test_server(); // 使消息服务器监听 127.0.0.1 的地址 if (server->open(&handle, "127.0.0.1:0") == false) { delete server; std::cout << "open server error!" << std::endl; getchar(); return (1); } char addr[256]; #ifdef WIN32 _snprintf(addr, sizeof(addr), "%s", server->get_addr()); #else snprintf(addr, sizeof(addr), "%s", server->get_addr()); #endif if (use_thread) { // 使消息客户端在子线程中单独运行 acl_pthread_t tid; acl_pthread_create(&tid, NULL, thread_callback, addr); } // 由于消息客户端也是非阻塞过程,因此也能够与消息服务器 // 在同一线程中运行 else client_main(&handle, addr); // 主线程的消息循环过程 while (true) { if (handle.check() == false) { std::cout << "stop now!" << std::endl; break; } } delete server; handle.check(); // 清理一些可能未关闭的异步流对象 std::cout << "server stopped!" << std::endl; getchar(); return (0); }
以上例子相对简单,其展现的消息服务器与消息客户端均是非阻塞过程,其实将上面的异步消息客户端稍微一改即可以改为同步消息客户端了,修改部分以下:
class test_client3 : public ipc_client { public: test_client3() { } ~test_client3() { } virtual void on_open() { // 添加消息回调对象 this->append_message(MSG_RES); // 向消息服务器发送请求消息 this->send_message(MSG_REQ, NULL, 0); // 同步等待消息 wait(); } virtual void on_close() { delete this; } virtual void on_message(int nMsg, void*, int) { std::cout << "test_client3 on message:" << nMsg << std::endl; this->send_message(MSG_STOP, NULL, 0); this->delete_message(MSG_RES); this->close(); } protected: private: }; // 子线程处理过程 static bool client_main(const char* addr) { // 建立消息客户端对象 ipc_client* ipc = new test_client3(); // 同步方式链接消息服务器 if (ipc->open(addr, 0) == false) { std::cout << "open " << addr << " error!" << std::endl; delete ipc; // 当消息客户端未成功建立时须要在此处删除对象 return (false); } return (true); } static void* thread_callback(void *ctx) { const char* addr = (const char*) ctx; if (client_main(addr) == false) return (NULL); return (NULL); }
对比 test_client1 与 test_client_3 两个消息客户端,能够发现两者区别并不太大,关键在于调用 open 时是采用了异步仍是同步链接消息服务器,其决定了消息客户端是异步的仍是同步的。
示例代码:samples/aio_ipc
acl_cpp 下载:http://sourceforge.net/projects/acl/
原文地址:http://zsxxsz.iteye.com/blog/1495832
更多文章:http://zsxxsz.iteye.com/
QQ 群:242722074