构建异步处理网络服务器

  此次提到了网络服务器的异步处理,关于异步跟同步的概念,相信不少小伙伴都很是清楚,固然也有不少萌萌哒的小伙伴常常据说异步,但理解的不够。(这里会附带解释下后面涉及到的回调机制)后端

  A)实际生活中的同步和异步数组

       同步通信:最近ADSL拨号一直不是很稳定,因而打电话给电信的师傅请求支援,电话打通了,结果电信师傅正在忙着处理另外一个问题,因而我就拿着电话等着师傅处理完他手头的事情,而后再指导我。(很呆吧,但这种阻塞的通信方式就是同步)服务器

       异步通信:一样的问题,机智的我选择了打给客服,电话打通了,客服说:您的问题咱们已经记录,您只要留下可靠的联系方式,很快就会有技术人员跟您联系,而后我就能够挂了电话去干别的事情了,客服MM也能够继续去接别的电话了。(~~这种就是异步通信机制)网络

  B)计算机程序中的同步和异步多线程

       编写程序时,不一样的动态库或者模块之间老是存在着必定的接口,从调用方式上,能够把他们分为三类:同步调用、回调和异步调用。并发

       同步调用:一种阻塞式调用,调用方要等待对方执行完毕才返回, 它是一种单向调用;异步

       异步调用:一种相似消息或事件的机制,不过它的调用方向恰好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。socket

       回调:一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口(注册好的接口,能够理解为我刚才留给客服的电话号码!好吧,很绕慢慢来!)。函数

       因而可知,回调和异步调用的关系很是紧密,一般咱们使用回调来实现异步消息的注册,经过异步调用来实现消息的通知。工具

       理解了异步同步的概念,接下来开始咱们的重点。

       许多服务器的构建面对的最大问题之一是必须可以处理大量并发链接。处理多个链接有许多不一样的传统方法,可是在处理大量链接时它们每每会产生问题,由于它们使用的内存或 CPU 太多,或者达到了某个操做系统限制。

       先来看看经常使用作法:

       循环:早期系统使用简单的循环选择解决方案,即循环遍历打开的网络链接的列表,判断是否有要读取的数据。这种方法既缓慢(尤为是随着链接数量增长愈来愈慢),又低效(由于在处理当前链接时其余链接可能正在发送请求并等待响应)。在系统循环遍历每一个链接时,其余链接不得不等待。若是有 100 个链接,其中只有一个有数据,那么仍然必须处理其余 99 个链接,才能轮到真正须要处理的链接。

  Poll和epoll:这是对循环方法的改进,它用一个结构保存要监视的每一个链接的数组,当在网络套接字上发现数据时,经过回调机制调用处理函数。poll 的问题是这个结构会很是大,在列表中添加新的网络链接时,修改结构会增长负载并影响性能。

  选择方式:即select() 函数调用使用一个静态结构,由于有1024 个链接的数量限制,所以不适用于很是大的部署。

  Thread多线程方式:利用多线程支持监听和处理链接,为每一个链接启动一个thread。这种方式,在处理大量链接时,会大幅增长 内存和 CPU的开销,由于每一个线程都须要本身的执行空间。另外,若是每一个线程都忙于处理网络链接,线程之间的上下文切换会很频繁。最后,许多内核并不适于处理如此大量的活跃线程。

  讲故事讲究欲擒故纵、欲扬先抑,说了这几种经常使用方式固然只是铺垫,下面重点介绍基于Linux网络库libevent的方式。

  libevent 库实际上没有更换 select()、poll() 或其余机制的基础。而是使用对于每一个平台最高效的高性能解决方案在实现外加上一个包装器。为了实际处理每一个请求,libevent 库提供一种事件机制,它做为底层网络后端的包装器。事件系统让为链接添加处理函数变得很是简便,同时下降了底层 I/O 复杂性。

  构建libevent 服务器的基本方法是,注册当发生某一操做(好比接受来自客户端的链接)时应该执行的函数,而后调用主事件循环event_dispatch()。执行过程的控制如今由 libevent 系统处理。注册事件和将调用的函数以后,事件系统开始自治;在应用程序运行时,能够在事件队列中添加(注册)或删除(取消注册)事件。事件注册很是方便,能够经过它添加新事件以处理新打开的链接,从而构建灵活的网络处理服务系统。

  下面用实际代码来讲明libevent库如何构建异步处理服务器:

  打开监听套接字,注册一个回调函数(每当监听到客户端的链接请求,在调用 accept() 函数以打开新链接时调用它),由此构建网络服务器:

/*init libevent*/
ev_init(); 
/* Setup listening socket */
event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
event_add(&ev_accept, NULL);
/* Start the event loop. */
event_dispatch();

  event_set() 函数建立新的事件结构,这里的EV_PERSIST是为了建立一个永久的监听事件,由于event机制在处理完一个就绪事件后为了提升遍历效率会主动删除这个事件,若是须要继续监听,则须要从新在事件队列中add此事件,而设置EV_PERSIST属性,event机制将不会删除已处理事件,保持持续监听client动做。

  event_add() 在事件队列机制中添加事件。

  event_dispatch() 启动事件队列系统,开始监听(并接受)请求。

  On_accept函数:当接受链接时,事件系统调用此函数。此函数接受到客户端的链接;添加客户端套接字信息和一个 bufferevent 结构;在事件结构中为客户端套接字上的读/写/错误事件添加回调函数;做为参数传递客户端结构(和嵌入的 eventbuffer 和客户端套接字)。每当对应的客户端套接字包含读、写或错误操做时,调用对应的回调函数。

  完整代码以下:(一个简易应答服务器:把客户端发过来的信息,回发给客户端)

  1 include <event.h>
  2 #include <sys/types.h>
  3 #include <sys/socket.h>
  4 #include <netinet/in.h>
  5 #include <arpa/inet.h>
  6 #include <string.h>
  7 #include <stdlib.h>
  8 #include <stdio.h>
  9 #include <fcntl.h>
 10 #include <unistd.h>
 11 
 12 #define SERVER_PORT 8899
 13 int debug = 0;
 14 
 15 struct client {
 16   int fd;
 17   struct bufferevent *buf_ev;
 18 };
 19 
 20 int setnonblock(int fd)
 21 {
 22   int flags;
 23 
 24   flags = fcntl(fd, F_GETFL);
 25   flags |= O_NONBLOCK;
 26   fcntl(fd, F_SETFL, flags);
 27 }
 28 
 29 void buf_read_callback(struct bufferevent *incoming,
 30                        void *arg)
 31 {
 32   struct evbuffer *evreturn;
 33   char *req;
 34 
 35   req = evbuffer_readline(incoming->input);
 36   if (req == NULL)
 37     return;
 38 
 39   evreturn = evbuffer_new();
 40   evbuffer_add_printf(evreturn,"Client msg %s\n",req);
 41   bufferevent_write_buffer(incoming,evreturn);
 42   evbuffer_free(evreturn);
 43   free(req);
 44 }
 45 
 46 void buf_write_callback(struct bufferevent *bev,
 47                         void *arg)
 48 {
 49 }
 50 
 51 void buf_error_callback(struct bufferevent *bev,
 52                         short what,
 53                         void *arg)
 54 {
 55   struct client *client = (struct client *)arg;
 56   bufferevent_free(client->buf_ev);
 57   close(client->fd);
 58   free(client);
 59 }
 60 
 61 void on_accept(int fd,
 62                      short ev,
 63                      void *arg)
 64 {
 65   int client_fd;
 66   struct sockaddr_in client_addr;
 67   socklen_t client_len = sizeof(client_addr);
 68   struct client *client;
 69 
 70   client_fd = accept(fd,
 71                      (struct sockaddr *)&client_addr,
 72                      &client_len);
 73   if (client_fd < 0)
 74     {
 75       fprintf(stderr,"Client: accept() failed\n");
 76       return;
 77     }
 78 
 79   setnonblock(client_fd);
 80 
 81   client = calloc(1, sizeof(*client));
 82   if (client == NULL)
 83     fprintf(stderr, "malloc failed\n");
 84   client->fd = client_fd;
 85 
 86   client->buf_ev = bufferevent_new(client_fd,
 87                                    buf_read_callback,
 88                                    buf_write_callback,
 89                                    buf_error_callback,
 90                                    client);
 91 
 92   bufferevent_enable(client->buf_ev, EV_READ);
 93 }
 94 
 95 int main(int argc,
 96          char **argv)
 97 {
 98   int socketlisten;
 99   struct sockaddr_in addresslisten;
100   struct event accept_event;
101   int reuse = 1;
102 
103   event_init();
104 
105   socketlisten = socket(AF_INET, SOCK_STREAM, 0);
106 
107   if (socketlisten < 0)
108     {
109       fprintf(stderr,"Failed to create listen socket");
110       return 1;
111     }
112 
113   memset(&addresslisten, 0, sizeof(addresslisten));
114 
115   addresslisten.sin_family = AF_INET;
116   addresslisten.sin_addr.s_addr = INADDR_ANY;
117   addresslisten.sin_port = htons(SERVER_PORT);
118 
119   if (bind(socketlisten,
120            (struct sockaddr *)&addresslisten,
121            sizeof(addresslisten)) < 0)
122     {
123       fprintf(stderr,"Failed to bind");
124       return 1;
125     }
126 
127   if (listen(socketlisten, 5) < 0)
128     {
129       fprintf(stderr,"Failed to listen to socket");
130       return 1;
131     }
132 
133   setsockopt(socketlisten,
134              SOL_SOCKET,
135              SO_REUSEADDR,
136              &reuse,
137              sizeof(reuse));
138 
139   setnonblock(socketlisten);
140 
141   event_set(&accept_event,
142             socketlisten,
143             EV_READ|EV_PERSIST,
144             on_accept,
145             NULL);
146 
147   event_add(&accept_event,
148             NULL);
149 
150   event_dispatch();
151 
152   close(socketlisten);
153 
154   return 0;
155 }

  服务器启动后,就会监听8899端口,等待处理client请求:

  客户端可使用telnet,或者其余组包工具来模拟:

  好了:),此次的分享先介绍到这里,有什么意见或建议,你们积极留言,Thanks!

相关文章
相关标签/搜索