《Linux高性能服务器编程》学习总结(十三)——多进程编程

  在多进程编程中,咱们用fork系统调用建立子进程,值得注意的是,fork函数复制当前进程并在内核进程表中建立一个新的表项,其堆、栈指针,标志寄存器的值都和父进程相同,可是其ppid被设置成父进程pid,信号位图被清除。而子进程代码和父进程彻底相同,其数据也会复制自父进程,可是其复制过程是写时复制,即父子任意进程对数据执行写操做时才会复制,首先是缺页中断,而后操做系统给子进程分配空间并复制数据。此外,建立子进程后父进程中打开的文件描述符在子进程中也是默认打开的,其文件描述符引用计数加一,父进程的用户根目录,当前工做目录等变量的引用计数均会加一。编程

  僵尸进程是多进程编程中须要了解和避免的状况,僵尸进程的产生是因为子进程先于父进程退出,而此时父进程没有正常接收子进程退出状态,致使子进程脱离父进程存在于操做系统中,还占据了原有的进程描述符和资源,形成了资源的浪费。避免僵尸进程的产生共有三种方法:第一种是处理SIGCHLD信号,当子进程退出时会向父进程发送SIGCHLD信号,咱们能够指明信号处理函数来进行处理;第二种是拖管法,由父进程建立一个中间进程,再由这个中间进程建立子进程后直接退出,这样子进程变成了孤儿进程,会由init进程托管,由init进程负责回收其资源,可是这样就破坏了父子进程之间的血缘关系;第三种就是阻塞法,调用wait函数等待子进程退出,缺点就是会使父进程进入阻塞状态。对于wait函数,其进入阻塞状态显然是咱们所不容许的,因此waitpid函数就解决了这个问题。服务器

  在多进程编程中,一个最为重要的须要解决的问题就是进程间通讯IPC,咱们经常使用的IPC方法有:管道、信号量、共享内存和消息队列。其系统调用较为简单,咱们只来简单说明一下这四种方式的区别和异同。首先管道是咱们在前文就提到过的,并且讲过了再网络编程中常常使用socketpair函数建立一个双向管道,管道分为无名管道和有名管道,无名管道只能用于有亲缘关系的进程之间进行通讯,并且只能用低级文件编程库中的读写函数,而有名管道就是建立一个管道文件,经过文件进行信息交流,因此没有亲缘关系的限制。信号量是用来实现多进程之间的互斥与同步的,其本质是一个整形的数,咱们用pv操做来对其进行操做,若是是二进制的信号量,则能够用信号量保证对于临界资源来说同一时刻只有一段代码能对其进行访问。共享内存是在内存空间建立或获取一段空间来让各个进程进行共享,经过这段空间进行信息交流,而这种方式就比较灵活,能够自定其数据结构,完成各式各样的功能。消息队列就是一个能存放消息的队列,各个进程将消息发送到消息队列中,并指明要发送的对象,其余程序能够直接监听这个队列,收取发给本身的消息。几种进程间通讯的方式都很广泛,咱们来看一个用共享内存实现的聊天室服务器程序:网络

  

 1 /*************************************************************************  2  > File Name: 13-4.cpp  3  > Author: Torrance_ZHANG  4  > Mail: 597156711@qq.com  5  > Created Time: Wed 14 Feb 2018 04:18:04 PM PST  6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;  10 
 11 #define USER_LIMIT 5
 12 #define BUFFER_SIZE 1024
 13 #define FD_LIMIT 65535
 14 #define MAX_EVENT_NUMBER 1024
 15 #define PROCESS_LIMIT 65536
 16 
 17 struct client_data {  18  sockaddr_in address;  19     int connfd;  20  pid_t pid;  21     int pipefd[2];  22 };  23 
 24 static const char* shm_name = "/my_shm";  25 int sig_pipefd[2];  26 int epollfd;  27 int listenfd;  28 int shmfd;  29 char* share_mem = 0;  30 client_data* users = 0;  31 int* sub_process = 0;  32 int user_count = 0;  33 bool stop_child = false;  34 
 35 int setnonblocking(int fd) {  36     int old_option = fcntl(fd, F_GETFL);  37     int new_option = old_option | O_NONBLOCK;  38  fcntl(fd, F_SETFL, new_option);  39     return old_option;  40 }  41 
 42 void addfd(int epollfd, int fd) {  43     epoll_event event;  44     event.data.fd = fd;  45     event.events = EPOLLIN | EPOLLET;  46     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);  47  setnonblocking(fd);  48 }  49 
 50 void sig_handler(int sig) {  51     int save_errno = errno;  52     int msg = sig;  53     send(sig_pipefd[1], (char*)&msg, 1, 0);  54     errno = save_errno;  55 }  56 
 57 void addsig(int sig, void(*handler)(int), bool restart = true) {  58     struct sigaction sa;  59     memset(&sa, 0, sizeof(sa));  60     sa.sa_handler = handler;  61     if(restart) sa.sa_flags |= SA_RESTART;  62     sigfillset(&sa.sa_mask);  63     assert(sigaction(sig, &sa, NULL) != -1);  64 }  65 
 66 void del_resource() {  67     close(sig_pipefd[0]);  68     close(sig_pipefd[1]);  69  close(listenfd);  70  close(epollfd);  71  shm_unlink(shm_name);  72     delete [] users;  73     delete [] sub_process;  74 }  75 
 76 void child_term_handler(int sig) {  77     stop_child = true;  78 }  79 
 80 int run_child(int idx, client_data* users, char* share_mem) {  81  epoll_event events[MAX_EVENT_NUMBER];  82     int child_epollfd = epoll_create(5);  83     assert(child_epollfd != -1);  84     int connfd = users[idx].connfd;  85  addfd(child_epollfd, connfd);  86     int pipefd = users[idx].pipefd[1];  87  addfd(child_epollfd, pipefd);  88     int ret;  89     addsig(SIGTERM, child_term_handler, false);  90 
 91     while(!stop_child) {  92         int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);  93         if((number < 0) && (errno != EINTR)) {  94             printf("epoll failure\n");  95             break;  96  }  97         for(int i = 0; i < number; i ++) {  98             int sockfd = events[i].data.fd;  99             if((sockfd == connfd) && (events[i].events & EPOLLIN)) { 100                 memset(share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE); 101                 ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); 102                 if(ret < 0) { 103                     if(errno != EAGAIN) { 104                         stop_child = true; 105  } 106  } 107                 else if(ret == 0) { 108                     stop_child = true; 109  } 110                 else { 111                     send(pipefd, (char*)&idx, sizeof(idx), 0); 112  } 113  } 114             else if((sockfd == pipefd) && (events[i].events & EPOLLIN)) { 115                 int client = 0; 116                 ret = recv(sockfd, (char*)&client, sizeof(client), 0); 117                 if(ret < 0) { 118                     if(errno != EAGAIN) { 119                         stop_child = true; 120  } 121  } 122                 else if(ret == 0) { 123                     stop_child = true; 124  } 125                 else { 126                     send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); 127  } 128  } 129             else continue; 130  } 131  } 132  close(connfd); 133  close(pipefd); 134  close(child_epollfd); 135     return 0; 136 } 137 
138 int main(int argc, char** argv) { 139     if(argc <= 2) { 140         printf("usage: %s ip_address port_number\n", basename(argv[0])); 141         return 1; 142  } 143     const char* ip = argv[1]; 144     int port = atoi(argv[2]); 145 
146     int ret = 0; 147     struct sockaddr_in address; 148     bzero(&address, sizeof(address)); 149     address.sin_family = AF_INET; 150     inet_pton(AF_INET, ip, &address.sin_addr); 151     address.sin_port = htons(port); 152 
153     listenfd = socket(AF_INET, SOCK_STREAM, 0); 154     assert(listenfd >= 0); 155 
156     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 157     assert(ret != -1); 158 
159     ret = listen(listenfd, 5); 160     assert(ret != -1); 161 
162     user_count = 0; 163     users = new client_data[USER_LIMIT + 1]; 164     sub_process = new int[PROCESS_LIMIT]; 165     for(int i = 0; i < PROCESS_LIMIT; i ++) { 166         sub_process[i] = -1; 167  } 168  epoll_event events[MAX_EVENT_NUMBER]; 169     epollfd = epoll_create(5); 170     assert(epollfd != -1); 171  addfd(epollfd, listenfd); 172     
173     ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd); 174     assert(ret != -1); 175     setnonblocking(sig_pipefd[1]); 176     addfd(epollfd, sig_pipefd[0]); 177 
178  addsig(SIGCHLD, sig_handler); 179  addsig(SIGTERM, sig_handler); 180  addsig(SIGINT, sig_handler); 181  addsig(SIGPIPE, SIG_IGN); 182     bool stop_server = false; 183     bool terminate = false; 184 
185     shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); 186     assert(shmfd != -1); 187     ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); 188     assert(ret != -1); 189     
190     share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); 191     assert(share_mem != MAP_FAILED); 192  close(shmfd); 193 
194     while(!stop_server) { 195         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 196         if((number < 0) && (errno != EINTR)) { 197             printf("epoll failure\n"); 198             break; 199  } 200         for(int i = 0; i < number; i ++) { 201             int sockfd = events[i].data.fd; 202             if(sockfd == listenfd) { 203                 struct sockaddr_in client_address; 204                 socklen_t client_addrlength = sizeof(client_address); 205                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 206                 if(connfd < 0) { 207                     printf("errno is: %d\n", errno); 208                     continue; 209  } 210                 if(user_count >= USER_LIMIT) { 211                     const char* info = "too many users\n"; 212                     printf("%s", info); 213                     send(connfd, info, strlen(info), 0); 214  close(connfd); 215                     continue; 216  } 217                 users[user_count].address = client_address; 218                 users[user_count].connfd = connfd; 219                 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); 220                 assert(ret != -1); 221                 pid_t pid = fork(); 222                 if(pid < 0) { 223  close(connfd); 224                     continue; 225  } 226                 else if(pid == 0) { 227  close(epollfd); 228  close(listenfd); 229                     close(users[user_count].pipefd[0]); 230                     close(sig_pipefd[0]); 231                     close(sig_pipefd[1]); 232  run_child(user_count, users, share_mem); 233                     munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE); 234                     exit(0); 235  } 236                 else { 237  close(connfd); 238                     close(users[user_count].pipefd[1]); 239                     addfd(epollfd, users[user_count].pipefd[0]); 240                     users[user_count].pid = pid; 241                     sub_process[pid] = user_count; 242                     user_count ++; 243  } 244  } 245             else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { 246                 int sig; 247                 char signals[1024]; 248                 ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); 249                 if(ret == -1) continue; 250                 else if(ret == 0) continue; 251                 else { 252                     for(int i = 0; i < ret; i ++) { 253                         switch(signals[i]) { 254                             case SIGCHLD: { 255  pid_t pid; 256                                 int stat; 257                                 while((pid = waitpid(-1, &stat, WNOHANG)) > 0) { 258                                     int del_user = sub_process[pid]; 259                                     sub_process[pid] = -1; 260                                     if((del_user < 0) || (del_user > USER_LIMIT)) { 261                                         continue; 262  } 263                                     epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); 264                                     close(users[del_user].pipefd[0]); 265                                     users[del_user] = users[-- user_count]; 266                                     sub_process[users[del_user].pid] = del_user; 267  } 268                                 if(terminate && user_count == 0) { 269                                     stop_server = true; 270  } 271                                 break; 272  } 273                             case SIGTERM: 274                             case SIGINT: { 275                                 printf("kill all the child now\n"); 276                                 if(user_count == 0) { 277                                     stop_server = true; 278                                     break; 279  } 280                                 for(int i = 0; i < user_count; i ++) { 281                                     int pid = users[i].pid; 282  kill(pid, SIGTERM); 283  } 284                                 terminate = true; 285                                 break; 286  } 287                             default : break; 288  } 289  } 290  } 291  } 292             else if(events[i].events & EPOLLIN) { 293                 int child = 0; 294                 ret = recv(sockfd, (char*)&child, sizeof(child), 0); 295                 printf("read data from child accross pipe\n"); 296                 if(ret == -1) continue; 297                 else if(ret == 0) continue; 298                 else { 299                     for(int j = 0; j < user_count; j ++) { 300                         if(users[j].pipefd[0] != sockfd) { 301                             printf("send data to child accross pipe\n"); 302                             send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); 303  } 304  } 305  } 306  } 307  } 308  } 309  del_resource(); 310     return 0; 311 }

  服务器接收到客户端数据时,经过管道告知专门为其余服务器服务的子进程向对应客户端发送数据。数据结构

  咱们知道,当父进程建立子进程以后,父进程中打开的文件描述符仍然保持打开,因此文件描述符能够很方便地从父进程传递到子进程。须要特别注意的是进程描述符的传递并非单纯地传送一个值,而是要在接收进程中建立一个新的文件描述符,而且该文件描述符和发送进程中被传递的文件描述符指向内核中的同一个文件表项。那么问题就来了,咱们如何从子进程将文件描述符传送给父进程呢?推广来讲,如何在不相关的两个进程之间传送文件描述符呢?这时咱们就须要利用UNIX域socket在进程间传递特殊的辅助数据,咱们来看一个例子:socket

 1 /*************************************************************************  2  > File Name: 13-5.cpp  3  > Author: Torrance_ZHANG  4  > Mail: 597156711@qq.com  5  > Created Time: Tue 27 Feb 2018 03:35:13 AM PST  6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std; 10 
11 static const int CONTROL_LEN = CMSG_LEN(sizeof(int)); 12 
13 void send_fd(int fd, int fd_to_send) { 14     struct iovec iov[1]; 15     struct msghdr msg; 16     char buf[0]; 17 
18     iov[0].iov_base = buf; 19     iov[0].iov_len = 1; 20     msg.msg_name = NULL; 21     msg.msg_namelen = 0; 22     msg.msg_iov = iov; 23     msg.msg_iovlen = 1; 24 
25  cmsghdr cm; 26     cm.cmsg_len = CONTROL_LEN; 27     cm.cmsg_level = SOL_SOCKET; 28     cm.cmsg_type = SCM_RIGHTS; 29     *(int *)CMSG_DATA(&cm) = fd_to_send; 30     msg.msg_control = &cm; 31     msg.msg_controllen = CONTROL_LEN; 32 
33     sendmsg(fd, &msg, 0); 34 } 35 
36 int recv_fd(int fd) { 37     struct iovec iov[1]; 38     struct msghdr msg; 39     char buf[0]; 40 
41     iov[0].iov_base = buf; 42     iov[0].iov_len = 1; 43     msg.msg_name = NULL; 44     msg.msg_namelen = 0; 45     msg.msg_iov = iov; 46     msg.msg_iovlen = 1; 47     
48  cmsghdr cm; 49     msg.msg_control = &cm; 50     msg.msg_controllen = CONTROL_LEN; 51     recvmsg(fd, &msg, 0); 52 
53     int fd_to_read = *(int*)CMSG_DATA(&cm); 54     return fd_to_read; 55 } 56 
57 int main() { 58     int pipefd[2]; 59     int fd_to_pass = 0; 60     int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd); 61     assert(ret != -1); 62 
63     pid_t pid = fork(); 64     assert(pid >= 0); 65 
66     if(pid == 0) { 67         close(pipefd[0]); 68         fd_to_pass = open("test.txt", O_RDWR, 0666); 69         send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); 70  close(fd_to_pass); 71         exit(0); 72  } 73 
74     close(pipefd[1]); 75     fd_to_pass = recv_fd(pipefd[0]); 76     char buf[1024]; 77     memset(buf, 0, sizeof(buf)); 78     read(fd_to_pass, buf, 1024); 79     printf("I got fd %d and data %s\n", fd_to_pass, buf); 80  close(fd_to_pass); 81 }

相关文章
相关标签/搜索