参考资料php
<<精通Linux C编程>>html
在Android中的Handler的Native层研究文章中研究一下一把Linux中的匿名管道的通讯机制,今天这里Linux中的进程间通讯补齐。ios
在Linux中,实现进程通讯的方法包括管道(匿名管道和具名管道),消息队列,信号量,共享内存,套接口等。消息队列,信号量,共享内存统称为系统的(POSIX和System V)IPC,用于本地间的进程通讯,套接口(socket)则运用于远程进程通讯。c++
各个通讯机制定义以下:编程
匿名管道(Pipe)和具名管道(named pipe):匿名管道用于具备亲缘关系进程间的通讯,具名管道克服了管道没有名字的限制,所以除了具备匿名管道的功能外,还容许在无亲缘关系的进程中进行通讯。ubuntu
消息队列(Message):消息队列为消息的连接表,包括POSIX消息队列和System V消息队列。有足够权限的进程能够向队列中添加消息,被赋予读权限的进程则能够读取队列中的消息。segmentfault
共享内存:是的多个进程能够访问同一块内存空间,是最快的能够IPC形式。是针对其余的通讯机制运行效率较低而设计出来的。每每与其余通讯机制,如信号量结合使用,来达到进程间的同步与互斥。服务器
信号量(semaphore):主要做为进程间以及同一进程不一样线程的同步手段。
套接口(socket):最通常的进程通讯机制,可用于远程通讯。
关于匿名管道的理解以及Demo,在Android中的Handler的Native层研究文章中已经讲述过了,这里就不作介绍了。直接看具名管道(FIFO)。具名管道的提出是为了解决匿名管道只能用于具备亲缘关系(父子,兄弟)的进程间通讯,具名管道提供了一个路径名与之关联,以FIFO的文件形式存在于文件系统中,即便没有亲缘关系的进程也可经过该路径名达到互相通讯的目的。
匿名管道与FIFO的区别主要在以下俩个点:
FIFO能够用于任何两个进程的通讯,而匿名管道只能用于有亲缘关系的进程中
FIFO做为一种特殊的文件存放于系统中,不像匿名管道存放于内存当中(使用后消失)。当进程对FIFO使用完毕后,FIFO依然存活于文件系统当中,除非主动删除,不然不会消失。
因为上面的第二个特性,能够解决系统在应用中产生的大量的中间临时文件的问题,达到重用的目的。
建立一个命令管道可使用以下两个命令建立:
#include <sys/types.h> #include <sys/stat.h> int mkfifo(const char *filename, mode_t mode); int mknod(const char *filename, mode_t mode | S_IFIFO, (dev_t)0); //不建议使用了
关于mode_t,看过其实就是文件访问权限表示,在鸟哥linux私房菜的权限一章中有介绍,这里就不讲了,详细的本身查看连接吧,下面简单实现一个Demo:
#include<iostream> #include<signal.h> #include<sys/types.h> #include<sys/stat.h> #include<errno.h> void testNamePipe(){ mode_t mode=0666; const char* name="namePipeTest"; int ret=mkfifo(name,mode); if(errno==EEXIST){ printf("对象存在"); }else if(ret<0){ printf("建立命名管道失败,自动退出"); exit(1); }else{ printf("建立命名管道成功"); } }
编译须要加入-lrt 如g++ main.cpp -lrt -o main
对于管道的操做以下:
在操做命令管道open()函数的传参主要实现有以下几种:
open(const char *path, O_RDONLY); // 1 open(const char *path, O_RDONLY | O_NONBLOCK); // 2 open(const char *path, O_WRONLY); // 3 open(const char *path, O_WRONLY | O_NONBLOCK); // 4
引用自这篇文章
在open函数的调用的第二个参数中,你看到一个陌生的选项 O_NONBLOCK,选项 O_NONBLOCK 表示非阻塞,加上这个选项后,表示open调用是非阻塞的,若是没有这个选项,则表示open调用是阻塞的。
open调用的阻塞是什么一回事呢?很简单,对于以只读方式(O_RDONLY)打开的FIFO文件,若是open调用是阻塞的(即第二个参数为O_RDONLY),除非有一个进程以写方式打开同一个FIFO,不然它不会返回;若是open调用是非阻塞的的(即第二个参数为O_RDONLY | O_NONBLOCK),则即便没有其余进程以写方式打开同一个FIFO文件,open调用将成功并当即返回。
对于以只写方式(O_WRONLY)打开的FIFO文件,若是open调用是阻塞的(即第二个参数为O_WRONLY),open调用将被阻塞,直到有一个进程以只读方式打开同一个FIFO文件为止;若是open调用是非阻塞的(即第二个参数为O_WRONLY | O_NONBLOCK),open总会当即返回,但若是没有其余进程以只读方式打开同一个FIFO文件,open调用将返回-1,而且FIFO也不会被打开。
简单实现阻塞的Demo以下,这里直接使用父子进程进行测试:
#include <errno.h> #include <fcntl.h> //O_WRONLY等头文件 #include <iostream> #include <signal.h> #include <sys/stat.h> #include <sys/types.h> #include <unistd.h> using namespace std; const char* name = "namePipeTest"; void rwNamePipe() { int pid = -1; if ((pid = fork()) < 0) { printf("%s", "fork error"); } else if (pid == 0) { //子进程 printf("%s\n", "子进程建立成功"); int writeId = open(name, O_WRONLY); //以只写的形式 if (writeId < 0) { printf("写端打开失败"); exit(1); } else { printf("写端打开成功"); char buf[] = "hello named pipe"; for (int i = 0; i < 10; i++) { cout << "写入数据中" << endl; write(writeId, buf, sizeof(buf)); sleep(2); //睡眠两秒 } close(writeId); exit(0); } } else { //父进程,即当前进程 printf("%s\n", "父进程开始做业"); int readId = open(name, O_RDONLY); if (readId < 0) { printf("读端打开失败"); exit(1); } else { printf("开始进入读取数据阶段"); char buffer[1024]; while (read(readId, buffer, sizeof(buffer)) > 0) { printf("父进程读到数据=%s\n", buffer); } close(readId); exit(0); } } } void testNamePipe() { mode_t mode = 0666;//owner,group,others都有读写权限 int ret = mkfifo(name, mode); if (errno == EEXIST) { printf("对象存在"); rwNamePipe(); } else if (ret < 0) { printf("建立命名管道失败,自动退出"); exit(1); } else { printf("建立命名管道成功"); rwNamePipe(); } } int main() { testNamePipe(); return 0; }
获得的结果:
$ ./main 对象存在父进程开始做业 对象存在子进程建立成功 写端打开成功写入数据中 开始进入读取数据阶段父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe 写入数据中 父进程读到数据=hello named pipe
消息队列为以一种链表式结构组织的一组数据,存放于内核之中,由个进程经过消息队列标识符引用传递数据的一种方式,由内核维护。消息队列为最具备数据操做性的数据床送方式,在消息队列中能够随意的根据特定的数据类型来检索消息。
消息队列跟匿名管道以及FIFO的区别(来自该篇文章):
一个进程向消息队列写入消息以前,并不须要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,那么内核会产生SIGPIPE信号。
IPC的持续性不一样。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,而后终止,另一个进程能够在之后某个时刻打开该队列读取消息。只要内核没有从新自举,消息队列没有被删除。
POSIX消息队列的相关操做(更详细的能够man各个函数查看):
//打开一个消息队列 mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode,struct mq_attr *attr); //关闭消息队列 int mq_close(mqd_t mqdes); //从系统中删除消息队列 int mq_unlink(const char *name); //获取以及设置消息队列属性 int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); //man查阅可知 struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ }; //发送以及接收消息 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
测试代码以下:
测试中遇到了两个问题记录以下:
在调用mq_receive()时候遇到了Message too long的问题,主要是由于主要缘由在于传递的msg_len小于mq_msgsize致使,详细可查看文章1以及文章2。
在mq_open()时候爆出invalid argument错误,缘由是不一样ubuntu系统中对于mq_attr支持的设置不同,可经过文章3查看系统支持的参数大小
测试Demo:
#ifndef MES_QUEUE_H_ #define MES_QUEUE_H_ #include <fcntl.h> #include <iostream> #include <mqueue.h> #include <string.h> #include <sys/stat.h> #include <unistd.h> using namespace std; void receiveQueue(string name) { cout << "客户端读取消息-----------------------" << endl; mode_t mode = 0666; struct mq_attr att; att.mq_msgsize = 30; att.mq_maxmsg = 10; att.mq_curmsgs = 0; att.mq_flags = 0; mqd_t openId = mq_open(name.c_str(),O_RDWR | O_CREAT|O_EXCL, mode,&att ); if (openId < 0 && errno != EEXIST) { cout << "error open mq:" << strerror(errno) << endl; return; } if(openId<0&&errno==EEXIST){ cout<<"文件存在打开"<<endl; openId=mq_open(name.c_str(),O_RDONLY); if(openId<0){ cout<<"打开失败:"<<strerror(errno)<<endl; return; } } struct mq_attr attr; if (mq_getattr(openId, &attr) < 0) { cout << "error get attr" << endl; return; } else { printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); } char buffer[50]; cout << "开始读取消息" << endl; while (true) { if(mq_receive(openId, buffer,50, NULL)>=0){ printf("读取的消息是:%s\n", buffer); }else{ //cout<<strerror(errno)<<endl; } } mq_close(openId); } void sendQueue(string name) { cout << "服务端发送消息----------------------" << endl; mqd_t openId = mq_open(name.c_str(),O_RDWR); if (openId < 0) { cout << "error open mq" << errno << endl; return; } struct mq_attr attr; if (mq_getattr(openId, &attr) < 0) { cout << "error get attr" << endl; fprintf(stderr, "发送失败: %s\n", strerror(errno)); return; } else { printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); } //int size=static_cast<int>(attr.mq_msgsize); cout << "开始发送消息" << endl; for (int i = 0; i < 10; i++) { string result = "msq no: " + to_string(i); const char* msg_ptr = result.c_str(); cout<<"发送消息中"<<endl; int rec = mq_send(openId, msg_ptr, strlen(msg_ptr)+1, 1); if (rec < 0) { cout << "发送信息失败" << rec << endl; fprintf(stderr, "发送失败: %s\n", strerror(errno)); break; } else { cout << "写入消息为" << result << endl; } sleep(3); } cout<<"发送完毕"<<endl; mq_close(openId); } void runMsgQueue() { string name = "/msq_test"; int pid = -1; if ((pid = fork()) < 0) { printf("%s", "fork error"); exit(1); } else if (pid == 0) { //子进程 sendQueue(name); } else { //本进程 receiveQueue(name); } } #endif
输出结果以下:
$ ./main 客户端读取消息----------------------- flags: 0, maxmsg: 10, msgsize: 30, curmsgs: 0 开始读取消息 服务端发送消息---------------------- flags: 0, maxmsg: 10, msgsize: 30, curmsgs: 0 开始发送消息 发送消息中 写入消息为msq no: 0 读取的消息是:msq no: 0 发送消息中 写入消息为msq no: 1 读取的消息是:msq no: 1 发送消息中 写入消息为msq no: 2 读取的消息是:msq no: 2 发送消息中 写入消息为msq no: 3 读取的消息是:msq no: 3 发送消息中 写入消息为msq no: 4 读取的消息是:msq no: 4 发送消息中 写入消息为msq no: 5 读取的消息是:msq no: 5 发送消息中 写入消息为msq no: 6 读取的消息是:msq no: 6 发送消息中 写入消息为msq no: 7 读取的消息是:msq no: 7 发送消息中 写入消息为msq no: 8 读取的消息是:msq no: 8 发送消息中 写入消息为msq no: 9 读取的消息是:msq no: 9 发送完毕
额外提个tip,这里队列能够理解成优先级队列的概念,在咱们mq_send()最后一个参数为优先级,在服务端receive的时候会按照优先级进行读取,而不是客户端最早发送的。
信号量(semaphore)是一种提供不一样进程间或者一个给定进程不一样线程之间的同步,这里依然分为POSIX信号量和SystemV信号量,文章中只对POSIX信号量进行学习概括。
在POSIX信号量中,分为有名信号量和无名信号量:
有名信号量:使用Posix IPC名字标识,既可用于线程间的同步,又能够用于进程间的同步。
无名信号量:无名信号量只存在于内存中,而且规定可以访问该内存的进程才可以使用该内存中的信号量。这就意味着,无名信号量只能被这样两种线程使用:(1)来自同一进程的各个线程(2)来自不一样进程的各个线程,可是这些进程映射了相同的内存范围到本身的地址空间。
总而言之,无名信号量通常用于线程间同步或互斥,而有名信号量通常用于进程间同步或互斥。
有名信号量和无名信号量的使用区别以下:
有名信号量的头文件在semaphore.h
中,具体涉及的函数以下所示:
注: Link with -pthread
sem_open() //初始化并打开有名信号量 sem_wait()/sem_trywait()/sem_timedwait()/sem_post()/sem_getvalue() //操做信号量 sem_close() //退出有名信号量 sem_unlink() //销毁有名信号量
打开一个有名信号量:
//传入参数参考消息队列的mq_open()中相对应参数,value参数用来指定信号量的初始值,取值范围[0,SEM_VALUE_MAX] sem_t *sem_open(const char *name, int oflag); sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
操做有名信号量:
//成功返回下降后的信号量的值,失败返回-1以及errno //试图占用信号量,若是信号量值>0,就-1,若是已经=0,就block,直到>0 int sem_wait(sem_t *sem); //试图占用信号量,若是信号量已经=0,当即报错 int sem_trywait(sem_t *sem); //试图占用信号量 //若是信号量=0,就block abs_timeout那么久,超时则报错 int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); //归还信号量,成功返回0,失败返回-1,以及errno int sem_post(sem_t *sem); //得到信号量sem的当前的值,放到sval中。若是有线程正在block这个信号量,sval可能返回两个值,0或“-正在block的线程的数目”,Linux返回0 //成功返回0,失败返回-1以及errno int sem_getvalue(sem_t *sem, int *sval);
关闭有名信号量:
//关闭有名信号量,成功返回0,失败返回-1以及errno int sem_close(sem_t *sem);
删除有名信号量:
//试图销毁信号量,一旦全部占用该信号量的进程都关闭了该信号量,那么就会销毁这个信号量,成功返回0,失败返回-1以及errno int sem_unlink(const char *name);
这里选择使用线程进行测试,测试Demo以下:
#ifndef SEMAPHORE_TEST_H_ #define SEMAPHORE_TEST_H_ #include<iostream> #include<fcntl.h> #include<unistd.h> #include <semaphore.h> #include <pthread.h> #include <sys/stat.h> #include<string.h> using namespace std; static sem_t* sem; void *runChildThread(void* arg) { int * id=static_cast<int*>(arg); int pid=*id; cout<<"pid="<<pid<<"的线程等待信号量"<<endl; sem_wait(sem); //申请信号量 cout<<"pid="<<pid<<"得到信号量"<<endl; sleep(2); sem_post(sem); /*释放信号量*/ cout<<"pid="<<pid<<"释放信号量"<<endl; } void runSemaphoreTest() { string name="/sem_test"; mode_t mode=0666; uint value=1; sem= sem_open(name.c_str(),O_CREAT,mode,value); if(sem==SEM_FAILED){ cout<<"create name sem error:"<<strerror(errno)<<endl; return; } cout<<"成功建立信号量"<<endl; pthread_t tid=12; for(int i=0;i<10;i++){ int result= pthread_create(&tid,NULL,runChildThread,&i); if(result!=0){ cout<<"建立线程失败,程序退出"<<endl; exit(1); } } sleep(30);//测试线程执行 sem_close(sem); } #endif
结果以下:
$ ./main 成功建立信号量 pid=1的线程等待信号量 pid=1得到信号量 pid=2的线程等待信号量 pid=3的线程等待信号量 pid=4的线程等待信号量 pid=5的线程等待信号量 pid=6的线程等待信号量 pid=7的线程等待信号量 pid=8的线程等待信号量 pid=9的线程等待信号量 pid=10的线程等待信号量 pid=1释放信号量 pid=2得到信号量 pid=2释放信号量 pid=3得到信号量 pid=3释放信号量 pid=4得到信号量 pid=4释放信号量 pid=5得到信号量 pid=5释放信号量 pid=6得到信号量 pid=6释放信号量 pid=7得到信号量 pid=7释放信号量 pid=8得到信号量 pid=8释放信号量 pid=9得到信号量 pid=9释放信号量 pid=10得到信号量 pid=10释放信号量
上述Demo的信号量的数量设置为1,只有一个线程能获取到信号量进入代码执行,其余线程须要等待当前线程释放信号量后,而后进行抢夺信号量,或获得的线程进行代码执行,依次进行下去,若是把sem_open()的value参数改为三则说明最多三个线程能够同时进行,这里就不写Demo了。
须要注意的一点是有名信号量的值是随内核持续的。也就是说,一个进程建立了一个信号量,这个进程结束后,这个信号量还存在,而且信号量的值也不会改变。
无名信号量因为没有名字,因此使用方法与有名信号量略有不一样,却别主要在建立以及销毁的操做上,区别的函数以下:
sem_init() //建立/得到无名信号量 sem_destroy() //销毁无名信号量
测试的Demo能够把对应有名信号量的方法换成上述两个方法便可,就不详细介绍了。
内核管理一片物理内存,容许不一样的进程同时映射,多个进程能够映射同一块内存,被多个进程同时映射的物理内存,即共享内存。因为自己实现并不能保证同步,因此须要咱们本身进行同步,最多见的是使用信号量的方式进行同步。
使用说明:
#include <sys/ipc.h> #include <sys/shm.h> int shmget(key_t key, size_t size, int shmflg); void *shmat(int shm_id, const void *shm_addr, int shmflg); int shmdt(const void *shm_addr); int shmctl(int shm_id, int cmd, struct shmid_ds *buf);
因为共享内存自己不涉及进程通讯,就不给出Demo了,想要了解使用方式的能够百度一下。
套接字,就是咱们的Socket,经过套接字,咱们能够实现本地或者远程两个进程之间 的通讯,在网络编程中常常能碰见Socket编程。上面介绍的进程通讯局限于本机的进程之间通讯,而Socket则主要实现远端与本机的进程通讯。
这里简单介绍一下Socket与Http的区别把。在大学里都学过网络由下往上分为,物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,通常来讲咱们把会话层,表示层和应用层统称为应用层。IP协议对应于网络层,TCP协议对应于传输层,而HTTP协议在应用层。如图下所示([图片来自网络):
而Socket是应用层与TCP/IP协议族通讯的中间软件抽象层,它是一组接口,封装了TCP/IP的调用实现,固然也支持UDP协议。http的本质实现上也须要依赖Socket进行通讯。
关于linux下的Socket通讯用法,网上写的文章我以为比我整理学习的好的多,再次放上几个连接把(吐个槽,网上基本一篇文章复制来复制去的),就不整理了,要整理的话不是一篇文章能够写的。其实最好就是看文档了,用man命令是很是值得拥有的。
针对TCP放上Demo,linux下使用c++开发服务端,使用JAVA充当客户端《服务端以下:
#ifndef SOCKET_TEST_H_ #define SOCKET_TEST_H_ #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include<iostream> #include<string.h> #include<errno.h> #include<sys/types.h> #include<netinet/in.h> #include<unistd.h> #include <arpa/inet.h> #include <net/if.h> #include <sys/ioctl.h> #define PORT 9876 #define MAXLINE 4096 using namespace std; void getSockName(int& sock){ struct sockaddr_in addr; socklen_t addr_len = sizeof(struct sockaddr_in); /* 获取本端的socket地址 */ int nRet = getsockname(sock,(struct sockaddr*)&addr,&addr_len); if(nRet == -1) { perror("getsockname error: "); }else{ printf("this socket addr %s %d successful\n",inet_ntoa(addr.sin_addr),ntohs(addr.sin_port)); } } void startServer(){ //ipv4 int socketFd=socket(AF_INET,SOCK_STREAM,0); if(socketFd==-1){ cout<<"open socket error: "<<strerror(errno)<<endl; exit(1); } cout<<"建立socket成功"<<endl; struct sockaddr_in addr; addr.sin_family = AF_INET; //若是使用INADDR_ANY方式,须要加以判断 //是否符合网络字节序,即大端的传输方式,若是机器为小端, //须要经过htonl转换成网络字节序相配的,若是使用inet_addr()方法 //,无需考虑大小端的问题 addr.sin_addr.s_addr =(inet_addr("192.168.199.244")); addr.sin_port = htons(PORT); //绑定 int bindStatus=bind(socketFd,(struct sockaddr*)&addr,sizeof(addr)); getSockName(socketFd); if(bindStatus==-1){ cout<<"bind error "<<strerror(errno)<<endl; exit(1); } cout<<"绑定接口ok"<<endl; if(listen(socketFd,10)==-1){ cout<<"开启监听失败"<<endl; exit(1); } char buffer[50]; while(1){ memset(buffer,0,50); cout<<"开始接收"<<endl; int isAccept=accept(socketFd, (struct sockaddr*)NULL, NULL); if( isAccept== -1){ cout<<"接收失败"<<strerror(errno)<<endl; exit(1); } int result=recv(isAccept,buffer,MAXLINE,0); if(result==-1){ cout<<"接收消息失败"<<strerror(errno)<<endl; exit(1); } string bufferStr=buffer; close(isAccept); if(bufferStr=="over"){ cout<<"收到over信号,关闭服务端"<<endl; break; }else{ printf("接收到的消息为:%s\n",buffer); } } close(socketFd); } #endif
客户端代码:
private static void startClient(){ new Thread(() -> { try { Socket socket = new Socket("192.168.199.244",9876); //2.拿到客户端的socket对象的输出流发送给服务器数据 OutputStream os = socket.getOutputStream(); //写入要发送给服务器的数据 os.write(("over" ).getBytes(StandardCharsets.UTF_8)); os.flush(); os.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); System.out.print("socket connect erro"); } }).start(); }
最终输出结果:
建立socket成功 this socket addr 192.168.199.244 9876 successful 绑定接口ok 开始接收 接收到的消息为:01234 开始接收 收到over信号,关闭服务端