一些概念:linux
同步和异步ios
同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发I/O操做并等待或者轮询的去查看I/O操做是否就绪,而异步是指用户进程触发I/O操做之后便开始作本身的事情,而当I/O操做已经完成的时候会获得I/O完成的通知。编程
阻塞和非阻塞后端
阻塞和非阻塞是针对于进程在访问数据的时候,根据I/O操做的就绪状态来采起的不一样方式,说白了是一种读取或者写入操做函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会当即返回一个状态值。缓存
服务器端几种模型:服务器
一、阻塞式模型(blocking IO)网络
咱们第一次接触到的网络编程都是从 listen()、accpet()、send()、recv() 等接口开始的。使用这些接口能够很方便的构建C/S的模型。这里大部分的 socket 接口都是阻塞型的。所谓阻塞型接口是指系统调用(通常是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用得到结果或者超时出错时才返回。多线程
以下面一个简单的Server端实现:异步
#include <Winsock2.h> #include <cstdio> #include <iostream> #include <string> using namespace std; #pragma comment(lib,"ws2_32.lib") int init_win_socket() { WSADATA wsaData; if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) { return -1; } return 0; } #define Server_Port 10286 void handle_client(int newfd) { while(1) { char buff[1024]; memset(buff,0,1024); int result = recv(newfd,buff,1024,0); if(result <= 0) { break; } else { printf("Receive Data %s, Size: %d \n",buff,result); int ret = send(newfd,buff,result,0); if(ret>0) { printf("Send Data %s, Size: %d \n",buff,ret); } else { break; } } } closesocket(newfd); return; } int run() { int listener; struct sockaddr_in addr_server; listener = socket(AF_INET, SOCK_STREAM, 0); //addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); addr_server.sin_addr.S_un.S_addr = ADDR_ANY; addr_server.sin_family = AF_INET; addr_server.sin_port = htons(Server_Port); if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0) { perror("bind error"); return -1; } if (listen(listener, 10)<0) { perror("listen error"); return -1; } printf("Server is listening ... \n"); bool runing = true; while(runing) { sockaddr_in addr_client; int clientlen = sizeof(addr_client); int client_sock; if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0) { printf("Failed to accept client connection \n"); } fprintf(stdout, "Client connected: %s \n", inet_ntoa(addr_client.sin_addr)); /*Handle this connect */ handle_client(client_sock); } closesocket(listener); return 0; } int main(int c, char **v) { #ifdef WIN32 init_win_socket(); #endif run(); getchar(); return 0; }
示意图以下:socket
这里的socket的接口是阻塞的(blocking),在线程被阻塞期间,线程将没法执行任何运算或响应任何的网络请求,这给多客户机、多业务逻辑的网络编程带来了挑战。
二、多线程的服务器模型(Multi-Thread)
应对多客户机的网络应用,最简单的解决方式是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每一个链接都拥有独立的线程(或进程),这样任何一个链接的阻塞都不会影响其余的链接。
多线程Server端的实现:
#include <Winsock2.h> #include <cstdio> #include <iostream> #include <string> using namespace std; #pragma comment(lib,"ws2_32.lib") int init_win_socket() { WSADATA wsaData; if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) { return -1; } return 0; } #define Server_Port 10286 DWORD WINAPI handle_client(LPVOID lppara) { int *newfd = (int *)lppara; while(1) { char buff[1024]; memset(buff,0,1024); int result = recv(*newfd,buff,1024,0); if(result <= 0) { break; } else { printf("Receive Data %s, Size: %d \n",buff,result); int ret = send(*newfd,buff,result,0); if(ret>0) { printf("Send Data %s, Size: %d \n",buff,ret); } else { break; } } Sleep(10); } closesocket(*newfd); return 0; } int run() { int listener; struct sockaddr_in addr_server; int sock_clients[1024]; //max number for accept client connection; listener = socket(AF_INET, SOCK_STREAM, 0); //addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); addr_server.sin_addr.S_un.S_addr = ADDR_ANY; addr_server.sin_family = AF_INET; addr_server.sin_port = htons(Server_Port); if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0) { perror("bind error"); return -1; } if (listen(listener, 10)<0) { perror("listen error"); return -1; } printf("Server is listening ... \n"); int fd_count = 0; bool runing = true; while(runing) { sockaddr_in addr_client; int clientlen = sizeof(addr_client); int client_sock; if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0) { printf("Failed to accept client connection \n"); } fprintf(stdout, "Client connected: socket fd %d , %s \n", client_sock,inet_ntoa(addr_client.sin_addr)); /*Handle this connect */ if(fd_count<1024) { sock_clients[fd_count] = client_sock; if(CreateThread(NULL,0,handle_client,&sock_clients[fd_count],0,NULL)==NULL) return -1; ++ fd_count; } Sleep(10); } closesocket(listener); return 0; } int main(int c, char **v) { #ifdef WIN32 init_win_socket(); #endif run(); getchar(); return 0; }
上述多线程的服务器模型能够解决一些链接量不大的多客户端链接请求,可是若是要同时响应成千上万路的链接请求,则不管多线程仍是多进程都会严重占据系统资源,下降系统对外界响应效率。
在多线程的基础上,能够考虑使用“线程池”或“链接池”,“线程池”旨在减小建立和销毁线程的频率,其维持必定合理数量的线程,并让空闲的线程从新承担新的执行任务。“链接池”维持链接的缓存池,尽可能重用已有的链接、减小建立和关闭链接的频率。这两种技术均可以很好的下降系统开销,都被普遍应用不少大型系统。
三、非阻塞式模型(Non-blocking IO)
非阻塞的接口相比于阻塞型接口的显著差别在于,在被调用以后当即返回。
非阻塞型IO的示意图以下:
从应用程序的角度来讲,blocking read 调用会延续很长时间。在内核执行读操做和其余工做时,应用程序会被阻塞。
非阻塞的IO可能并不会当即知足,须要应用程序调用许屡次来等待操做完成。这可能效率不高,由于在不少状况下,当内核执行这个命令时,应用程序必需要进行忙碌等待,直到数据可用为止。
另外一个问题,在循环调用非阻塞IO的时候,将大幅度占用CPU,因此通常使用select等来检测”是否能够操做“。
四、多路复用IO
支持I/O复用的系统调用有select、poll、epoll、kqueue等,
这里以Select函数为例,select函数用于探测多个文件句柄的状态变化,如下为一个使用了使用了Select函数的Server实现:
#include <Winsock2.h> #include <cstdio> #include <cstdlib> #include <cassert> #include <iostream> #include <string> using namespace std; #pragma comment(lib,"ws2_32.lib") int init_win_socket() { WSADATA wsaData; if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) { return -1; } return 0; } #define Server_Port 10286 #define MAX_LINE 16384 #define FD_SETSIZE 1024 struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; int writing; size_t n_written; size_t write_upto; }; struct fd_state * alloc_fd_state(void) { struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state)); if (!state) return NULL; state->buffer_used = state->n_written = state->writing = state->write_upto = 0; memset(state->buffer,0,MAX_LINE); return state; } void free_fd_state(struct fd_state *state) { free(state); } int set_socket_nonblocking(int fd) { unsigned long mode = 1; int result = ioctlsocket(fd, FIONBIO, &mode); if (result != 0) { return -1; printf("ioctlsocket failed with error: %ld\n", result); } return 0; } int do_read(int fd, struct fd_state *state) { char buf[1024]; int i; int result; while (1) { memset(buf,0,1024); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = buf[i]; } } state->writing = 1; state->write_upto = state->buffer_used; printf("Receive data: %s size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written); if (result == 0) { return 1; } else if (result < 0) { #ifdef WIN32 if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK) return 0; #else if (errno == EAGAIN) return 0; #endif return -1; } return 0; } int do_write(int fd, struct fd_state *state) { while (state->n_written < state->write_upto) { int result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { #ifdef WIN32 if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK) return 0; #else if (errno == EAGAIN) return 0; #endif return -1; } assert(result != 0); printf("Send data: %s \n",state->buffer+ state->n_written); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 0; state->writing = 0; return 0; } void run() { int listener; struct fd_state *state[FD_SETSIZE]; struct sockaddr_in sin; int i, maxfd; fd_set readset, writeset, exset; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(Server_Port); for (i = 0; i < FD_SETSIZE; ++i) state[i] = NULL; listener = socket(AF_INET, SOCK_STREAM, 0); set_socket_nonblocking(listener); int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,(const char *)&one, sizeof(one)); if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } printf("Server is listening ... \n"); FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); while (1) { maxfd = listener; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); FD_SET(listener, &readset); for (i=0; i < FD_SETSIZE; ++i) { if (state[i]) { if (i > maxfd) maxfd = i; FD_SET(i, &readset); if (state[i]->writing) { FD_SET(i, &writeset); } } } if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) { perror("select"); return; } //check if listener can accept if (FD_ISSET(listener, &readset)) { struct sockaddr_in ss; int slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if(fd > FD_SETSIZE) { closesocket(fd); } else { printf("Accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr)); set_socket_nonblocking(fd); state[fd] = alloc_fd_state(); assert(state[fd]); } } //process read and write socket for (i=0; i < maxfd+1; ++i) { int r = 0; if (i == listener) continue; if (FD_ISSET(i, &readset)) { r = do_read(i, state[i]); } if (r == 0 && FD_ISSET(i, &writeset)) { r = do_write(i, state[i]); } if (r) { free_fd_state(state[i]); state[i] = NULL; closesocket(i); } } } } int main(int c, char **v) { #ifdef WIN32 init_win_socket(); #endif run(); return 0; }
示意图以下:
这里Select监听的socket都是Non-blocking的,因此在do_read() do_write()中对返回为EAGAIN/WSAEWOULDBLOCK都作了处理。
从代码中能够看出使用Select返回后,仍然须要轮训再检测每一个socket的状态(读、写),这样的轮训检测在大量链接下也是效率不高的。由于当须要探测的句柄值较大时,select () 接口自己须要消耗大量时间去轮询各个句柄。
不少操做系统提供了更为高效的接口,如 linux 提供 了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。若是须要实现更高效的服务器程序,相似 epoll 这样的接口更被推荐。遗憾的是不一样的操做系统特供的 epoll 接口有很大差别,因此使用相似于 epoll 的接口实现具备较好跨平台能力的服务器会比较困难。
五、使用事件驱动库libevent的服务器模型
Libevent 是一种高性能事件循环/事件驱动库。
为了实际处理每一个请求,libevent 库提供一种事件机制,它做为底层网络后端的包装器。事件系统让为链接添加处理函数变得很是简便,同时下降了底层IO复杂性。这是 libevent 系统的核心。
建立 libevent 服务器的基本方法是,注册当发生某一操做(好比接受来自客户端的链接)时应该执行的函数,而后调用主事件循环 event_dispatch()。执行过程的控制如今由 libevent 系统处理。注册事件和将调用的函数以后,事件系统开始自治;在应用程序运行时,能够在事件队列中添加(注册)或 删除(取消注册)事件。事件注册很是方便,能够经过它添加新事件以处理新打开的链接,从而构建灵活的网络处理系统。
使用Libevent实现的一个回显服务器以下:
#include <event2/event.h> #include <assert.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state)); if (!state) { return NULL; } state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } memset(state->buffer,0,MAX_LINE); state->buffer_used = state->n_written = state->write_upto = 0; return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = (struct fd_state *) arg; char buf[1024]; int i; int result; assert(state->write_event); while(1) { memset(buf,0,1024); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) { break; } else { for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = buf[i]; } } } printf("receive data: %s size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written); assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; if (result == 0) { printf("connect closed \n"); free_fd_state(state); } else if (result < 0) { #ifdef WIN32 if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK) return; #else if (errno == EAGAIN) return; #endif perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = (struct fd_state *)arg; while (state->n_written < state->write_upto) { int result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { #ifdef WIN32 if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK) return; #else if (errno == EAGAIN) return; #endif free_fd_state(state); return; } assert(result != 0); printf("send data: %s \n",state->buffer+ state->n_written); state->n_written += result; } //buffer is full if (state->n_written == state->buffer_used) { state->n_written = state->write_upto = state->buffer_used = 0; memset(state->buffer,0,MAX_LINE); } } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = (struct event_base *)arg; struct sockaddr_in ss; int slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd > 0) { printf("accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr)); struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); assert(state->read_event); event_add(state->read_event, NULL); } } void run() { int listener; struct sockaddr_in addr_server; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) { perror("event_base_new error"); return; } addr_server.sin_addr.S_un.S_addr = ADDR_ANY; addr_server.sin_family = AF_INET; addr_server.sin_addr.s_addr = 0; addr_server.sin_port = htons(10286); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)); if (bind(listener, (struct sockaddr*)&addr_server, sizeof(addr_server)) < 0) { perror("bind error"); return; } if (listen(listener, 10)<0) { perror("listen error"); return; } printf("server is listening ... \n"); listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); event_add(listener_event, NULL); event_base_dispatch(base); } int init_win_socket() { WSADATA wsaData; if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) { return -1; } return 0; } int main(int c, char **v) { #ifdef WIN32 init_win_socket(); #endif run(); getchar(); return 0; }
六、信号驱动IO模型(Signal-driven IO)
使用信号,让内核在描述符就绪时发送SIGIO信号通知应用程序,称这种模型为信号驱动式I/O(signal-driven I/O)。
图示以下:
首先开启套接字的信号驱动式I/O功能,并经过sigaction系统调用安装一个信号处理函数。该系统调用将当即返回,咱们的进程继续工做,也就是说进程没有被阻塞。当数据报准备好读取时,内核就为该进程产生一个SIGIO信号。随后就能够在信号处理函数中调用recvfrom读取数据报,并通知主循环数据已经准备好待处理,也能够当即通知主循环,让它读取数据报。
不管如何处理SIGIO信号,这种模型的优点在于等待数据报到达期间进程不被阻塞。主循环能够继续执行 ,只要等到来自信号处理函数的通知:既能够是数据已准备好被处理,也能够是数据报已准备好被读取。
七、异步IO模型(asynchronous IO)
异步I/O(asynchronous I/O)由POSIX规范定义。演变成当前POSIX规范的各类早起标准所定义的实时函数中存在的差别已经取得一致。通常地说,这些函数的工做机制是:告知内核启动某个操做,并让内核在整个操做(包括将数据从内核复制到咱们本身的缓冲区)完成后通知咱们。这种模型与前一节介绍的信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知咱们什么时候能够启动一个I/O操做,而异步I/O模型是由内核通知咱们I/O操做什么时候完成。
示意图以下:
咱们调用aio_read函数(POSIX异步I/O函数以aio_或lio_开头),给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek相似),并告诉内核当整个操做完成时如何通知咱们。该系统调用当即返回,而且在等待I/O完成期间,咱们的进程不被阻塞。本例子中咱们假设要求内核在操做完成时产生某个信号,该信号直到数据已复制到应用进程缓冲区才产生,这一点不一样于信号驱动I/O模型。
参考:
《UNIX网络编程》
使用 libevent 和 libev 提升网络应用性能:http://www.ibm.com/developerworks/cn/aix/library/au-libev/
使用异步 I/O 大大提升应用程序的性能:https://www.ibm.com/developerworks/cn/linux/l-async/