前10种限于同一台主机的两个进程之间的IPC
shell
实现机制:
管道是由内核管理的一个缓冲区
缓存
管道的建立
服务器
int pipe(int fd[2]);
管道的关闭由系统负责
数据结构
当管道的一端被关闭后,下列规则起做用:
1.当读一个写端已被关闭的管道时,在全部数据都被读取后,read返回0,以指示达到了文件结束处. 当读一个没有数据的管道时,read阻塞
2.若是写一个读端已被关闭的管道,则产生信号SIGPIPE。
若是忽略该信号或者捕捉该信号并从其处理程序返回,则write出错返回,errno设置为EPIPE。
函数
注意
1. 在写管道时,常数PIPE_BUF规定了内核中管道缓存器的大小。
2. 若是对管道进行write调用,并且要求写的字节数小于等于PIPE_BUF,则此操做不会与其余进程
对同一管道(或FIFO)的write操做穿插进行
3. 但,如有多个进程同时写一个管道(或FIFO),并且某个或某些进程要求写的字节数超过PIPE_BUF字节数,
则数据可能与其余写操做的数据相穿插。
布局
过滤器程序
性能
#include <stdio.h> FILE* popen(const char* cmdstring, const char* type) int pclose(FILE* fp);
函数popen:
先执行fork,而后调用 exec 以执行 cmdstring,而且返回一个标准I/O文件指针。
ui
注意
1. popen之中调用了fork函数,会出现与system同样的问题:
调用popen的进程当心使用waitpid,以及设置SIGCHLD的信号处理函数
spa
过滤程序从标准输入读取数据,对其进行适当处理后写到标准输出。几个过滤进程一般在shell管道命令中线性地链接。
当同一个程序产生某个过滤程序的输入,同时又读取该过滤程序的输出时,则该过滤程序就成为协同进程(coprocess)。
指针
// kill child first, when write to child, parent receives the signal static void sigpipe_handler(int sig) { fprintf(stderr, "SIGPIPE received from child.\n"); exit(1); }
int main(int argc, char* argv[]) { struct sigaction siga; siga.sa_handler = sigpipe_handler; sigemptyset(&siga.sa_mask); siga.sa_flags = SA_RESTART; if (sigaction(SIGPIPE, &siga, NULL) < 0){ fprintf(stderr, "sigaction err : %s\n", strerror(errno)); exit(-1); }
int p2c[2],c2p[2]; if (pipe(p2c) < 0 || pipe(c2p) < 0){ fprintf(stderr, "pipe err : %s\n", strerror(errno)); exit(-1); }
pid_t pid; if ((pid = fork()) < 0){ fprintf(stderr, "fork err : %s\n", strerror(errno)); exit(-1); } else if (pid == 0) { close(p2c[1]); close(c2p[0]); if (p2c[0] != STDIN_FILENO) { if (dup2(p2c[0],STDIN_FILENO) != STDIN_FILENO) { fprintf(stderr, "dup2 err : %s\n", strerror(errno)); exit(-1); } } if (c2p[1] != STDOUT_FILENO) { if (dup2(c2p[1],STDOUT_FILENO) != STDOUT_FILENO){ fprintf(stderr, "dup2 err : %s\n", strerror(errno)); exit(-1); } } if (execl("./child", "child", (char*)0) < 0) { fprintf(stderr, "execl err : %s\n", strerror(errno)); exit(-1); }
} else { close(c2p[1]); close(p2c[0]); char buf[BUFSIZ]; while (fgets(buf, BUFSIZ, stdin) != NULL) { if ( write(p2c[1], buf, strlen(buf)) < 0){ fprintf(stderr, "write err : %s\n", strerror(errno)); exit(-1); } int n; if (( n = read(c2p[0], buf, BUFSIZ)) < 0){ fprintf(stderr, "read err : %s\n", strerror(errno)); exit(-1); } buf[n] = '\0'; printf ("the answer is : %s\n", buf); } } return 0; }
-------------------------------------code A-------------------------------------------
如下代码通过编译生成的可执行文件的路径名称, 就是上面代码的child
#include "apue.h" int main(){ int n, int1,int2; char line[MAXLINE]; /* if (setvbuf(stdin, NULL, _IOLBF, 0) != 0) err_sys("setvbuf error") ; if (setvbuf(stdout, NULL, _IOLBF, 0)!= 0) err_sys("setvbuf error") ; */ while ((n = read(STDIN_FILENO, line, MAXLINE)) > 0){ /*deal with the line*/ if (write(STDOUT_FILENO, line, n) != n) err_sys("write err."); } exit(0); }
-------------------------------------code B-------------------------------------------
注意:
若code A 调用 Code B,则它再也不工做。问题出在系统默认的标准I/O缓存机制上。
当B被调用时,对标准输入的第一个fgets引发标准I/O库分配一个缓存,并选择缓存的类型。
由于标准输入是个管道,因此isatty为假,因而标准I/O库由系统默认是全缓存的。
对标准输出也有一样的处理。当B从其标准输入读取而发生堵塞时,A从管道读时也发生堵塞,因而产生了死锁。
解决方法:
将管道设置为行缓存,如Code B中被注释掉的代码所示
管道只能由相关进程使用. 但经过FIFO,不相关的进程也能交换数据。
建立FIFO
int mkfifo(const char* path, mode_t mode); int mkfifoat(int fd, const char* path, mode_t mode);
int main() { if (access(PIPE_PATH, F_OK) < 0) { if (errno == ENOENT) { if (mkfifo(PIPE_PATH, 0777) < 0) { fprintf(stderr, "mkfifo err : %s\n", strerror(errno)); exit(-1); } } else { fprintf(stderr, "access err : %s\n", strerror(errno)); exit(-1); } } int rfd = open(PIPE_PATH, O_RDONLY | O_NONBLOCK); if (rfd < 0) { fprintf(stderr, "open err : %s\n", strerror(errno)); exit(-1); } int n; char buf[1024]; while (( n = read(rfd, buf, sizeof (buf))) < 0) { if (errno == EAGAIN){ continue; } fprintf(stderr, "read err : %s\n", strerror(errno)); exit(-1); } buf[n] = '\0'; printf("%s\n", buf); return 0; }
FIFO的关闭由系统负责
FIFO有两种用途:
1.FIFO由shell命令使用以便将数据从一条管道线传送到另外一条,为此无需建立中间临时文件。
mkfifo fifo1 prog3 < fifo1 & prog1 < infile | tee fifo1 | prog2
2.FIFO用于客户机-服务器应用程序中,以在客户机和服务器之间传递数据。
注意
1. client一次发送的请求长度必须小于PIPE_BUF,避免屡次写的交叉
2. client必须在请求中包含自身的ID,从而server知道将replies发送给哪一个client
3. 当客户进程个数变为0时,server将在well-known的FIFO读到一个文件结束标志。
解决方法:server以读写方式打开well-known的FIFO
XSI IPC: 消息队列, 信号量, 共享存储
标识符(int)是IPC对象的内部名, 键(key_t)是该对象的外部名称
有多种方法使客户机和服务器在同一IPC结构上会合:
--1. 服务器能够指定关键字IPC_PRIVATE建立一个新IPC结构,将返回的标识符存放在某处以便客户机取用。
关键字IPC_PRIVATE保证服务器建立一个新IPC结构。
缺点:
服务器要将整型标识符写到文件中,而后客户机在此后又要读文件取得此标识符。
IPC_PRIVATE关键字也可用于父、子关系进程。父进程指定IPC_PRIVATE建立一个新IPC结构,
所返回的标识符在fork后可由子进程使用。子进程可将此标识符做为exec函数的一个参数传给一个新程序。
--2. 在一个公用头文件中定义一个客户机和服务器都承认的关键字。而后服务器指定此关键字建立一个新的IPC结构。
这种方法的问题是该关键字可能已与一个IPC结构相结合,在此状况下,get函数(msgget、semget或shmget)出错返回。
服务器必须处理这一错误,删除已存在的IPC结构,而后试着再建立它。
--3. 客户机和服务器认同一个路径名和课题ID(课题ID是0~255之间的字符值),而后调用函数ftok将这两个值
变换为一个关键字。而后在方法2中使用此关键字。
key_t ftok(const char* path, int id); // path:现有文件路径
注意 :
1. 对于不一样文件的两个路径名若使用同一个项目ID,ftok可能会产生相同的键
建立IPC结构的注意事项
三个get函数(msgget、semget和shmget)都有两个相似的参数key和一个整型的flag。
如若知足下列条件,则建立一个新的IPC结构(一般由服务器建立):
--1. key是IPC_PRIVATE,或
--2. key未与特定类型的IPC结构相结合,flag中指定了IPC_CREAT位。
注意:
为访问现存的队列(客户机),key必须等于建立该队列时所指定的关键字,而且不该指定IPC_CREAT。
为了访问一个现存队列,毫不能指定IPC_PRIVATE做为关键字。由于这是一个特殊的键值,它老是用于建立一个新队列。
为了访问一个用IPC_PRIVATE关键字建立的现存队列,必定要知道与该队列相结合的标识符,而后在其余IPC调用中·使用该标识符。
若是但愿建立一个新的IPC结构,保证不是引用具备同一标识符的一个现行IPC结构,
那么必须在flag中同时指定IPC_CREAT和IPC_EXCL位。这样作了之后,若是IPC结构已经存在就会形成出错,
返回EEXIST,这与指定了O_CREAT和O_EXCL标志的open相相似
struct ipc_perm { uid_t uid; /* owner’s effective user ID */ gid_t gid; /* owner’s effective group ID */ uid_t cuid; /* creator’s effective user ID */ gid_t cgid; /* creator’s effective group ID */ mode_t mode; /* access modes */ ... };
建立IPC结构时,对以上数据结构进行赋值
mode字段的值对于任何IPC结构都不存在执行许可权。
sysctl 观察和修改内核配置参数
ipcs -l 显示IPC相关的限制
缺点
1. IPC结构是在系统范围内起做用的,没有访问计数。
例如,若是建立了一个消息队列,在该队列中放入了几则消息,而后终止,可是该消息队列及其内容并不被删除。
它们余留在系统中直至由某个进程调用读消息或删除消息队列等
与管道相比,当最后一个访问管道的进程终止时,管道就被彻底地删除了。
对于FIFO而言,虽然当最后一个引用FIFO的进程终止时其名字仍保留在系统中,
可是留在FIFO中的数据却在此时所有删除。
2. 这些IPC结构并不按名字为文件系统所知。咱们不能用第三、4章中所述的函数来存取它们或修改它们的特性。
为了支持它们不得不增长了十多个全新的系统调用. 由于这些IPC不使用文件描述符,因此不能对它们使用
多路转接I/O函数:select和poll。这就使得一次使用多个IPC结构,以及用文件或设备I/O来使用IPC结构很难作到。
优势
a. 它们是可靠的,b. 流是受到控制的, c. 面向记录, d. 能够用非先进先出方式处理。
特色:能够按非先进先出次序读消息
数据结构
struct msqid_ds { // message queue id data structure struct ipc_perm msg_perm; /* 权限*/ msgqnum_t msg_qnum; /*消息数量*/ msglen_t msg_qbytes; /* max # of bytes on queue */ pid_t msg_lspid; /* pid of last msgsnd() */ pid_t msg_lrpid; /* pid of last msgrcv() */ time_t msg_stime; /* last-msgsnd() time */ time_t msg_rtime; /* last-msgrcv() time */ time_t msg_ctime; /* last-change time */ ... };
1.获取已有的message queue id,或新建一个message queue并返回id
#include <sys/msg.h> int msgget(key_t key, int flag);
2. 对队列进行操做
int msgctl(int msqid, int cmd, struct msqid_ds* buf);
三种操做:
IPC_STAT : 取此队列的msqid_ds结构,并将其存放在buf指向的结构中
IPC_SET : 按由buf指向的结构中的值,设置与此队列相关的结构中的四个字段. 只有超级用户才能增长msg_qbytes的值
IPC_RMID : 从系统中删除该消息队列以及仍在该队列上的全部数据。这种删除当即生效。
仍在使用这一消息队列的其余进程在它们下一次试图对此队列进行操做时,将出错返回EIDRM。
此命令只能由下列两种进程执行:一种是其有效用户ID等于msg_perm.cuid或msg_perm.uid;另外一种是具备超级用户特权的进程。
3.发送消息
int msgsnd (int msqid, const void* ptr, size_t nbytes, int flag);
每一个消息都由三部分组成,它们是:正长整型的类型字段、非负长度以及实际数据。消息老是放在队列尾端。
ptr能够指向这样的数据结构
struct msg_struct{ long msg_type;// 自定义 struct{ /*content*/ }; };
3.1. flag的值能够指定为IPC_NOWAIT。这相似于文件I/O的非阻塞I/O标志。
3.2. 若消息队列已满,则指定IPC_NOWAIT使得msgsnd当即出错返回EAGAIN。 若是没有指定IPC_NOWAIT,则进程阻塞直到:
---a. 空间能够容纳要发送的消息
---b. 从系统中删除了此队列。errno设置为EIDRM
---c. 捕捉到一个信号,并从信号处理程序返回。errno设置为EINTR。
4.接收消息
ssize_t msgrcv (int msqid, void* ptr, size_t nbytes, long type, int flag);
4.1 如同msgsnd中同样,ptr参数指向一个长整型数(返回的消息类型存放在其中),跟随其后的是存放实际消息数据的缓存。
4.2 nbytes说明数据缓存的长度。若返回的消息大于nbytes,并且在flag中设置了MSG_NOERROR,则该消息被截短
4.3 若是没有设置这一标志,而消息又太长,则出错返回E2BIG(消息仍留在队列中)。
4.4 参数type使咱们能够指定想要哪种消息:
---a. type == 0 返回队列中的第一个消息。
---b. type > 0 返回队列中消息类型为type的第一个消息。
---c. type < 0 返回队列中消息类型值小于或等于type绝对值,并且在这种消息中,其类型值又最小的消息。
非零type用于以非先进先出次序读消息。例如,若应用程序对消息赋优先权,那么type就能够是优先权值。
若是一个消息队列由多个客户机和一个服务器使用,那么type字段能够用来包含客户机进程ID。
4.5 能够指定flag值为IPC_NOWAIT,使操做不阻塞。这使得若是没有所指定类型的消息,则msgrcv出错返回ENOMSG。
4.6 若是没有指定IPC_NOWAIT,则进程阻塞直至:
---a. 有了指定类型的消息
---b. 从系统中删除了此队列(出错返回EIDRM)
---c. 捕捉到一个信号并从信号处理程序返回(出错返回EINTR)
a.信号量定义为含有一个或多个信号量值的集合。当建立一个信号量时,指定集合的各个值
b. 建立信息量(semget)与对其赋初值(semctl)分开。这是一个致命的弱点。
由于不能原子地建立一个信号量集合,而且对该集合中的全部值赋初值。
c. 进程在终止时并无释放已经分配给它的信号量。下面将要说明的UNDO功能就是要处理这种状况的。
数据结构
struct semid_ds{ struct ipc_perm sem_perm; /*see Section 14.6.2*/ ushort sem_nsems; /*#of semaphores in set 信号量集合的大小*/ time_t sem_otime; /*last-semop() time*/ time_t sem_ctime; /*last-change time*/ ... };
获取一个信号量ID
int semget (key_t key, int nsems, int flag);/*nsems: 信号量的数量*/
若是是建立新集合(服务器中),则必须指定nsems。若是引用一个现存的集合(客户机),则将nsems指定为0。
int semctl (int semid, int semnum, int cmd, .../* union semun arg */);
semnum :[0, nsems - 1]
cmd: 见【apue 中文第三版 P457】, 共十种命令
最后一个参数是个联合(union),而非指向一个联合的指针。暂略
信号集操做
int semop (int semid, struct sembuf semoparray[], size_t nops);
struct sembuf{ unsigned short sem_num; short sem_op; // 操做 short sem_flg; // IPC_WAIT, SEM };
不管什么时候只要为信号量操做指定了SEM_UNDO标志,而后分配资源(sem_op值小于0)
那么内核就会记住对于该特定信号量,分配给咱们多少资源(sem_op的绝对值)。
当该进程终止时,内核都将检验该进程是否还有还没有处理的信号量调整值,则按调整值对相应量值进行调整。
数据结构
struct shmid_ds { struct ipc_perm shm_perm; /* see Section 15.6.2 */ size_t shm_segsz; /* size of segment in bytes */ //共享内存的大小 pid_t shm_lpid; /* pid of last shmop() */ pid_t shm_cpid; /* pid of creator */ shmatt_t shm_nattch; /* number of current attaches */ time_t shm_atime; /* last-attach time */ time_t shm_dtime; /* last-detach time */ time_t shm_ctime; /* last-change time */ ... };
资源限制
1. 建立一个共享存储结构或引用一个现有的共享存储结构
int shmget(key_t key, size_t size, int flag);
建立时指定size, 引用时令size = 0, size为系统页长的整数倍
2. 对共享存储段执行操做
int shmctl(int shmid, int cmd, struct shmid_ds *buf );
cmd : IPC_STAT IPC_SET IPC_RMID SHM_LOCK SHM_UNLOCK
3. 将共享存储段链接到它的地址空间中
void* shmat (int shmid, const void* addr, int flag);
3.1. 若是addr为0,则此段链接到由内核选择的第一个可用地址上。(通常应指定add r为0,以便由内核选择地址。)
3.2. 若是在flag中指定了SHM_RDONLY位,则以只读方式链接此段。不然以读写方式链接此段。
4. 当对共享存储段的操做已经结束时,则调用shmdt脱接该段。
int shmdt(const void*addr); // dt : detach
shmdt没有删除其标识符以及其数据结构。该标识符仍然存在,直至某个进程调用shmctl特意删除它。
将struct shmid_ds的shm_nattch减一
5. 特定系统上的存储区布局
mmap和共享存储的异同
1.二者都实现进程间的通讯
2.mmap在不使用MAP_ANONYMOUS标志时,还涉及到对磁盘文件的读写
6. 利用具备共享和robust属性的互斥锁管理共享内存
#define SHM_U_R 0400 #define SHM_U_W 0200 #define ERR_SYS(str, err) do{ \ fprintf(stderr, (str)); \ fprintf(stderr, strerror(err)); \ exit(0); \ } while(0) #define ERR_QUIT(str) do{ \ fprintf(stderr, (str)) \ exit(0); \ } while(0) struct ds_t{ int age; char name[32]; char sex; }; struct shm_struct{ pthread_mutex_t shm_mutex; struct ds_t shm_info; }; static void print_ds(const struct ds_t* ds_ptr){ printf("age = %d.\n",ds_ptr->age); printf("sex = %c.\n",ds_ptr->sex); printf("name = %s.\n",ds_ptr->name); }
static void shm_init(struct shm_struct* shm_ptr){ pthread_mutexattr_t mutex_attr; int err; if (( err = pthread_mutexattr_init(&mutex_attr) )!= 0) ERR_SYS("pthread_mutexattr_init failed.",err); if (( err = pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED) )!= 0) ERR_SYS("pthread_mutexattr_setpshared failed.", err); if (( err = pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST) )!= 0) ERR_SYS("pthread_mutexattr_setrobust failed.", err); if ((err = pthread_mutex_init(&shm_ptr->shm_mutex, &mutex_attr)) != 0) ERR_SYS("pthread_mutex_init failed.",err); struct ds_t tmp = { 1, "HELLO WORLD", 'M'}; shm_ptr->shm_info = tmp; }
static void shm_lock(struct shm_struct* ptr){ int err; if ((err = pthread_mutex_lock(&ptr->shm_mutex)) < 0){ ERR_SYS("ptread_muetx_lock failed", err); } else if (err == EOWNERDEAD){ if (pthread_mutex_consistent(&ptr->shm_mutex) < 0) ERR_SYS("pthread_mutex_consistent failed", err); else{ pthread_mutex_unlock(&ptr->shm_mutex); shm_lock(ptr); // just in case that another process locks mutex first, then dies again } //printf("success recovered!\n"); } } static void shm_unlock(struct shm_struct* ptr){ int err; if ((err = pthread_mutex_unlock(&ptr->shm_mutex)) < 0) ERR_SYS("ptread_muetx_lock failed", err); }
int main(){ size_t shm_size = sizeof (struct shm_struct); int shm_id; if ( ( shm_id = shmget(IPC_PRIVATE, shm_size, SHM_U_R | SHM_U_W)) < 0 ) ERR_SYS("shmget failed.",errno); struct shm_struct* addr; if ( ( addr = (struct shm_struct*) shmat(shm_id, 0, 0)) < 0 ) ERR_SYS("shmat failed.",errno); shm_init(addr); shm_lock(addr); // then, parent process dies before unlocks share memory pid_t pid; if ((pid = fork()) < 0 ){ ERR_SYS("fork failed",errno); }else if (pid == 0){ struct shm_struct* addr; if ( ( addr = (struct shm_struct*)shmat(shm_id, 0, 0)) < 0 ) ERR_SYS("shmat failed.",errno); shm_lock(addr); print_ds(&addr->shm_info); shm_unlock(addr); exit(0); } return 0; }
共享存储可由不相关的进程使用。可是,若是进程是相关的,则SVR4提供了一种不一样的方法
设备/dev/zero在读时,是0字节的无限资源。此设备接收写向它的任何数据,但忽略此数据。
咱们对此设备做为IPC的兴趣在于,当对其进行存储映射时,它具备一些特殊性质:
a. 建立一个未名存储区,其长度是mmap的第二个参数,将其取整为系统上的最近页长。存储区都初始化为0。
b. 若是多个进程的共同祖先进程对mmap指定了MAP_SHARED标志,则这些进程可共享此存储区。
int main(int argc, char* argv[]){ const char buf[] = "/dev/zero"; int fd; if ((fd = open(buf, O_RDWR)) < 0) ERR_SYS("open failed:",errno); struct shm_struct* shm; size_t shm_size = sizeof (struct shm_struct); if ((shm = (struct shm_struct*)mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == (struct shm_struct*)MAP_FAILED){ ERR_SYS("mmap failed:", errno); } close(fd); // can close it shm_init(shm); pid_t pid; if ((pid = fork()) < 0){ ERR_SYS("fork err:", errno); }else if (pid == 0 ){ print_ds(&shm->shm_info); if (munmap(shm, shm_size) < 0) ERR_SYS("munmap failed:", errno); exit(0); } return 0; }
相比较"XSI信号量:
POSIX信号量:性能更好、接口使用更简单、在删除的表现更好
#include <semaphore> sem_t* sem_open(const char*name, int oflag, mode_t mode, unsigned int value);
其余暂略
1. 学会使用管道和FIFO,由于在大量应用程序中仍可有效地使用这两种基本技术。
2. 在新的应用程序中,要尽量避免使用消息队列以及信号量,而应当考虑管道和记录锁
3. 由于它们与UNIX内核的其余部分集成得要好得多。
4. 共享存储段有其应用场合,而mmap函数则可能在之后的版本中起更大做用。