做者:快课网——Jay13html
原文连接:http://www.cricode.com/3510.htmlnode
本文介绍几种服务器网络编程模型。废话很少说,直接正题。linux
同步阻塞迭代模型是最简单的一种IO模型。ios
其核心代码以下:git
1
2
3
4
5
6
7
8
|
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept(srvfd,...); //开始接受客户端来的链接
read(clifd,buf,...); //从客户端读取数据
dosomthingonbuf(buf);
write(clifd,buf) //发送数据到客户端
}
|
上面的程序存在以下一些弊端:程序员
1)若是没有客户端的链接请求,进程会阻塞在accept系统调用处,程序不能执行其余任何操做。(系统调用使得程序从用户态陷入内核态,具体请参考:程序员的自我修养)github
2)在与客户端创建好一条链路后,经过read系统调用从客户端接受数据,而客户端合适发送数据过来是不可控的。若是客户端迟迟不发生数据过来,则程序一样会阻塞在read调用,此时,若是另外的客户端来尝试链接时,都会失败。编程
3)一样的道理,write系统调用也会使得程序出现阻塞(例如:客户端接受数据异常缓慢,致使写缓冲区满,数据迟迟发送不出)。数组
同步阻塞迭代模型有诸多缺点。多进程并发模型在同步阻塞迭代模型的基础上进行了一些改进,以免是程序阻塞在read系统调用上。服务器
多进程模型核心代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept(srvfd,...); //开始接受客户端来的链接
ret = fork();
switch( ret )
{
case -1 :
do_err_handler();
break;
case 0 : // 子进程
client_handler(clifd);
break ;
default : // 父进程
close(clifd);
continue ;
}
}
//======================================================
void client_handler(clifd){
read(clifd,buf,...); //从客户端读取数据
dosomthingonbuf(buf);
write(clifd,buf) //发送数据到客户端
}
|
上述程序在accept系统调用时,若是没有客户端来创建链接,择会阻塞在accept处。一旦某个客户端链接创建起来,则当即开启一个新的进程来处理与这个客户的数据交互。避免程序阻塞在read调用,而影响其余客户端的链接。
在多进程并发模型中,每个客户端链接开启fork一个进程,虽然linux中引入了写实拷贝机制,大大下降了fork一个子进程的消耗,但若客户端链接较大,则系统依然将不堪负重。经过多线程(或线程池)并发模型,能够在必定程度上改善这一问题。
在服务端的线程模型实现方式通常有三种:
(1)按需生成(来一个链接生成一个线程)
(2)线程池(预先生成不少线程)
(3)Leader follower(LF)
为简单起见,以第一种为例,其核心代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void *thread_callback( void *args ) //线程回调函数
{
int clifd = *(int *)args ;
client_handler(clifd);
}
//===============================================================
void client_handler(clifd){
read(clifd,buf,...); //从客户端读取数据
dosomthingonbuf(buf);
write(clifd,buf) //发送数据到客户端
}
//===============================================================
bind(srvfd);
listen(srvfd);
for(;;){
clifd = accept();
pthread_create(...,thread_callback,&clifd);
}
|
服务端分为主线程和工做线程,主线程负责accept()链接,而工做线程负责处理业务逻辑和流的读取等。所以,即便在工做线程阻塞的状况下,也只是阻塞在线程范围内,对继续接受新的客户端链接不会有影响。
第二种实现方式,经过线程池的引入能够避免频繁的建立、销毁线程,能在很大程序上提高性能。但无论如何实现,多线程模型先天具备以下缺点:
1)稳定性相对较差。一个线程的崩溃会致使整个程序崩溃。
2)临界资源的访问控制,在加大程序复杂性的同时,锁机制的引入会是严重下降程序的性能。性能上可能会出现“辛辛苦苦好几年,一晚上回到解放前”的状况。
多进程模型和多线程(线程池)模型每一个进程/线程只能处理一路IO,在服务器并发数较高的状况下,过多的进程/线程会使得服务器性能降低。而经过多路IO复用,能使得一个进程同时处理多路IO,提高服务器吞吐量。
在Linux支持epoll模型以前,都使用select/poll模型来实现IO多路复用。
以select为例,其核心代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
bind(listenfd);
listen(listenfd);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
for(;;){
select(...);
if (FD_ISSET(listenfd, &rset)) { /*有新的客户端链接到来*/
clifd = accept();
cliarray[] = clifd; /*保存新的链接套接字*/
FD_SET(clifd, &allset); /*将新的描述符加入监听数组中*/
}
for(;;){ /*这个for循环用来检查全部已经链接的客户端是否由数据可读写*/
fd = cliarray[i];
if (FD_ISSET(fd , &rset))
dosomething();
}
}
|
select IO多路复用一样存在一些缺点,罗列以下:
相比select模型,poll使用链表保存文件描述符,所以没有了监视文件数量的限制,但其余三个缺点依然存在。
拿select模型为例,假设咱们的服务器须要支持100万的并发链接,则在__FD_SETSIZE 为1024的状况下,则咱们至少须要开辟1k个进程才能实现100万的并发链接。除了进程间上下文切换的时间消耗外,从内核/用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。所以,基于select模型的服务器程序,要达到10万级别的并发访问,是一个很难完成的任务。
epoll IO多路复用:一个看起来很美好的解决方案。 因为文章:高并发网络编程之epoll详解中对epoll相关实现已经有详细解决,这里就直接摘录过来。
因为epoll的实现机制与select/poll机制彻底不一样,上面所说的 select的缺点在epoll上不复存在。
设想一下以下场景:有100万个客户端同时与一个服务器进程保持着TCP链接。而每一时刻,一般只有几百上千个TCP链接是活跃的(事实上大部分场景都是这种状况)。如何实现这样的高并发?
在select/poll时代,服务器进程每次都把这100万个链接告诉操做系统(从用户态复制句柄数据结构到内核态),让操做系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,所以,select/poll通常只能处理几千的并发链接。
epoll的设计和实现与select彻底不一样。epoll经过在Linux内核中申请一个简易的文件系统(文件系统通常用什么数据结构实现?B+树)。把原先的select/poll调用分红了3个部分:
1)调用epoll_create()创建一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
2)调用epoll_ctl向epoll对象中添加这100万个链接的套接字
3)调用epoll_wait收集发生的事件的链接
如此一来,要实现上面说是的场景,只须要在进程启动时创建一个epoll对象,而后在须要的时候向这个epoll对象中添加或者删除链接。同时,epoll_wait的效率也很是高,由于调用epoll_wait时,并无一股脑的向操做系统复制这100万个链接的句柄数据,内核也不须要去遍历所有的链接。
下面来看看Linux内核具体的epoll机制实现思路。
当某一进程调用epoll_create方法时,Linux内核会建立一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。eventpoll结构体以下所示:
1
2
3
4
5
6
7
8
|
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着全部添加到epoll中的须要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要经过epoll_wait返回给用户的知足条件的事件*/
struct list_head rdlist;
....
};
|
每个epoll对象都有一个独立的eventpoll结构体,用于存放经过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就能够经过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
而全部添加到epoll中的事件都会与设备(网卡)驱动程序创建回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每个事件,都会创建一个epitem结构体,以下所示:
1
2
3
4
5
6
7
|
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
|
当调用epoll_wait检查是否有事件发生时,只须要检查eventpoll对象中的rdlist双链表中是否有epitem元素便可。若是rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
epoll数据结构示意图
从上面的讲解可知:经过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。
OK,讲解完了Epoll的机理,咱们便能很容易掌握epoll的用法了。一句话描述就是:三步曲。
第一步:epoll_create()系统调用。此调用返回一个句柄,以后全部的使用都依靠这个句柄来标识。
第二步:epoll_ctl()系统调用。经过此调用向epoll对象中添加、删除、修改感兴趣的事件,返回0标识成功,返回-1表示失败。
第三部:epoll_wait()系统调用。经过此调用收集收集在epoll监控中已经发生的事件。
最后,附上一个epoll编程实例。(此代码做者为sparkliang)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
//
// a simple echo server using epoll in linux
//
// 2009-11-05
// 2013-03-22:修改了几个问题,1是/n格式问题,2是去掉了原代码不当心加上的ET模式;
// 原本只是简单的示意程序,决定仍是加上 recv/send时的buffer偏移
// by sparkling
//
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[128]; // recv data buffer
int len, s_offset;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
bzero(ev->buff, sizeof(ev->buff));
ev->s_offset = 0;
ev->len = 0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d], evnets[%d]\n", ev->fd, events);
else
printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev->fd, op, events);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
}
printf("%s: accept, %d", __func__, errno);
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
int iret = 0;
if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < 0)
{
printf("%s: fcntl nonblocking failed:%d", __func__, iret);
break;
}
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN, &g_Events[i]);
}while(0);
printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin.sin_addr),
ntohs(sin.sin_port), g_Events[i].last_active, i);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)-1-ev->len, 0);
EventDel(g_epollFd, ev);
if(len > 0)
{
ev->len += len;
ev->buff[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT, ev);
}
else if(len == 0)
{
close(ev->fd);
printf("[fd=%d] pos[%d], closed gracefully.\n", fd, ev-g_Events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
if(len > 0)
{
printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff);
ev->s_offset += len;
if(ev->s_offset == ev->len)
{
// change to receive event
EventDel(g_epollFd, ev);
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN, ev);
}
}
else
{
close(ev->fd);
EventDel(g_epollFd, ev);
printf("send[fd=%d] error[%d]\n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d\n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
}
int main(int argc, char **argv)
{
unsigned short port = 12345; // default port
if(argc == 2){
port = atoi(argv[1]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]\n", port);
int checkPos = 0;
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
if(fds < 0){
printf("epoll_wait error, exit\n");
break;
}
for(int i = 0; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return 0;
}
|