前提:编程
IOCP的总体编程模型跟上面的纯重叠io 很是相似. 纯重叠io使用OVERLAPPED + APC函数完成.并发
这种模型的缺点是必须让调用apc函数进入alterable状态. 而IOCP解决了这个问题.IOCP让咱们本身建立一些线程,app
而后调用GetQueuedCompletionStatus 来告诉咱们某个io操做完成, 就像是在另外一个线程中执行了APC函数同样;socket
使用IOCP 的时候,通常状况下须要本身建立额外的线程,用于等待结果完成(GetQueuedCompletionStatus)函数
使用到的函数:测试
CreateIoCompletionPort : 建立/ 关联一个完成端口 . 操作系统
第3个参数是一个自定义数据, 第4个是最多N个线程可被调用;线程
注意与其关联的HANDLE 必需要有OVERLAPPED属性的指针
//建立一个完成端口 HANDLE hComp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0) //关联到完成端口. 第3个参数是一个自定义数据 //在GetQueuedCompletionStatus将携带这些数据返回. 这个自定义数据将一直与此套接字绑定在了一块儿 CreateIoCompletionPort((HANDLE)client_socket, hComp, (DWORD)pSockData, 0);
GetQueuedCompletionStatus :一旦相似WSARecv / WSASend 完成后 . 用此函数获取结果,就想APC函数同样,一旦完成io操做就调用. 此函数通常状况都在某一个线程中使用.注意一旦在某个线程中调用了此函数,这意味着,code
该线程就像被指派给了IOCP同样,供IOCP使用. 总之这个行为就想APC函数在另外一个线程被调用了;
关于解除关联: 一旦一个套接字关闭了 , closehandle /closesocket. 就将从IOCP的设备句柄列表中解除关联了
关于线程:
CreateIoCompletionPort 最后一个参数用于指定IOCP最多执行N个线程(若是是0 则使用默认CPU的核数). 但通常状况下,我会预留一些额外的线程.好比
个人CPU是4核即IOCP最多可以使用 4个线程 , 不过通常状况下会建立 8 个线程,给IOCP预留 额外4个线程 . 缘由是若是IOCP
有5个任务已经完成, 最多只有4个线程被唤醒. 若是其中某个线程调用了WaitForSingleObject 之类的函数 ,此时IOCP将唤醒额外的线程来处理第5个任务;
先补充一下. 对于WSARecv / WSASend 的OVERLAPPED操做,简称为投递操做.意思是让操做系统去干活,至于何时干完.
GetQueuedCompletionStatus 会通知你(即返回) . 所以所以, 须要注意, 这些参数像WSABUF 和 OVERLAPPED 必定要 new / malloc在堆中;
代码中都有注释: 另代码中有不少返回都没判断.这个例子仅仅解释如何编写IOCP
#include "stdafx.h" #include <process.h> #include "../utils.h" //包含了一些宏和一些打印错误信息的函数. #define BUFFSIZE 8192 #define Read 0 #define Write 1 //自定义数据 . 注意 结构的地址 与 第一个成员的地址相同 struct IOData { WSAOVERLAPPED overlapped; //每一个io操做都须要独立的一个overlapped WSABUF wsabuf; //读写各一份 int rw_mode; //判断读写操做 char * buf; //真正存放数据的地方, 须要初始化 }; //自定义数据.保存客户套接字和地址 struct SocketData { SOCKET hClientSocket; //客户端套接字 SOCKADDR_IN clientAddr; IOData * pRead; //2个指针,只是为了在线程中方便使用添加的 IOData * pWrite; }; //用于交换2个buf int swapBuf(WSABUF * a, WSABUF * b) { BOOL ret = FALSE; if (a && b){ char * buf = a->buf; a->buf = b->buf; b->buf = buf; ret = TRUE; } return ret; } //释放内存,解除关联 void freeMem(SocketData * pSockData) { closesocket(pSockData->hClientSocket); free(pSockData->pRead->buf); free(pSockData->pWrite->buf); free(pSockData->pWrite); free(pSockData->pRead); free(pSockData); } unsigned int WINAPI completeRoutine(void * param) { //完成端口 HANDLE hCom = (HANDLE)param; SocketData * pSockData = NULL; IOData * pIOData = NULL; DWORD flags = 0, bytes = 0; BOOL ret = 0; SOCKET hClientSocket = NULL; printf("tid:%ld start!\n", GetCurrentThreadId()); while (1) { flags = 0; //直到有任务完成即返回 ret = GetQueuedCompletionStatus(hCom, &bytes, (PULONG_PTR)&pSockData, (LPOVERLAPPED * )&pIOData, INFINITE); printf("GetQueuedCompletionStatus : %d , diy key : %p , pIOData:%p,mode:%d\n", ret, pSockData, pIOData,pIOData->rw_mode); //若是成功了 if (ret) { hClientSocket = pSockData->hClientSocket; //若是是WSARecv的 if (Read == pIOData->rw_mode) { printf("READ - > bytesRecved:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh); //对端关闭 if (0 == bytes) { printf("peer closed\n"); freeMem(pSockData); //释放内存 continue; } //测试数据 pSockData->pRead->buf[bytes] = 0; printf("Read buf:%s\n", pSockData->pRead->buf); //交换指针, 把recv的buf 给 write的buf; //把write的buf交换给recv . 若是并发量不大的时候能够这么作 swapBuf(&pIOData->wsabuf, &pSockData->pWrite->wsabuf); //回传操做.清空write OVERLAPPED memset(&pSockData->pWrite->overlapped, 0, sizeof(WSAOVERLAPPED)); pSockData->pWrite->wsabuf.len = bytes; WSASend(hClientSocket, &pSockData->pWrite->wsabuf, 1, NULL, 0, &pSockData->pWrite->overlapped, NULL); //再次投递一个recv操做,等待下次客户端发送 memset(&pSockData->pRead->overlapped, 0, sizeof(WSAOVERLAPPED)); pSockData->pRead->wsabuf.len = BUFFSIZE; WSARecv(hClientSocket, &pSockData->pRead->wsabuf, 1, NULL, &flags, &pSockData->pRead->overlapped, NULL); } else { // send 完成. printf("Send finsished - > bytes:%ld, high:%ld\n", bytes, pIOData->overlapped.InternalHigh); memset(&pIOData->overlapped, 0, sizeof(WSAOVERLAPPED)); } } else{ //一旦出错, 解除绑定即删除内存 print_error(GetLastError()); freeMem(pSockData); } } return 0; } int _tmain(int argc, _TCHAR* argv[]) { WSADATA wsadata; if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){ print_error(WSAGetLastError()); return 0; } SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); //指定线程数量. 通常 processors * 2 const DWORD nThreads = sysinfo.dwNumberOfProcessors * 2; //建立一个完成端口 , 前3个参数保证了建立一个独立的完成端口, 最后一个参数指定了完成 //端口可以使用的线程数. 0 使用当前cpu核数 HANDLE hCom = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); //准备一些线程供完成端口调用, 把完成端口同时传入 HANDLE * arr_threads = new HANDLE[nThreads]; for (int i = 0; i < sysinfo.dwNumberOfProcessors; ++i) arr_threads[i] = (HANDLE)_beginthreadex(NULL, 0, completeRoutine, (void*)hCom, 0, NULL); //建立一个支持OVERLAPPED的socket.这样的属性将被 accept 返回的socket所继承 SOCKET hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); SOCKADDR_IN serv_addr, client_addr; memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(PORT); serv_addr.sin_addr.s_addr = INADDR_ANY; bind(hListenSocket, (SOCKADDR*)&serv_addr, sizeof(serv_addr)); listen(hListenSocket, BACKLOG); SOCKET client_socket; int client_addr_size = 0; DWORD flags = 0; while (1){ client_addr_size = sizeof(client_addr); flags = 0; client_socket = accept(hListenSocket, (SOCKADDR*)&client_addr, &client_addr_size); puts("accepted"); //准备一份数据, 用于保存clientsocket, addr, 以及读写指针; SocketData * pSockData = (SocketData *)malloc(sizeof(SocketData)); pSockData->pRead = NULL; pSockData->pWrite = NULL; pSockData->hClientSocket = client_socket; memcpy(&pSockData->clientAddr, &client_addr, client_addr_size); //准备数据 IOData * pRead = (IOData *)malloc(sizeof(IOData)); //对于OVERLAPPED,须要额外注意,清0 memset(&pRead->overlapped, 0, sizeof(WSAOVERLAPPED)); pRead->buf = (char *)malloc(BUFFSIZE); pRead->rw_mode = Read; pRead->wsabuf.buf = pRead->buf; pRead->wsabuf.len = BUFFSIZE; pSockData->pRead = pRead; IOData *pWrite = (IOData *)malloc(sizeof(IOData)); pWrite->buf = (char *)malloc(BUFFSIZE); memset(&pWrite->overlapped, 0, sizeof(WSAOVERLAPPED)); pWrite->rw_mode = Write; pWrite->wsabuf.buf = pWrite->buf; pWrite->wsabuf.len = BUFFSIZE; pSockData->pWrite = pWrite; //与iocp关联在一块儿. 注意第3个参数, 把自定义数据一块儿传递过去 CreateIoCompletionPort((HANDLE)client_socket, hCom, (DWORD)pSockData, 0); WSARecv(client_socket, &pRead->wsabuf, 1, NULL, &flags, &pRead->overlapped, NULL); } return 0; }