#include<sys/types.h>
#include<sys/msg.h>
int msgget(key_t key,int msgflg);
//return message queue identifier on success,or -1 on error
复制代码
key: 参数能够设置值为IPC_PRIVATE,系统会建立一个全新的IPC对象,或者使用ftok()函数生成一个(接近惟一)key,key相同的状况下,系统会返回一个已建立相同key的IPC对象。程序员
msgflg:指定施加于新消息队列之上的权限和检查一个既有队列的权限的位掩码(相似与文件权限)安全
建立一个消息队列bash
int msqid=msgget(IPC_PRIVATE,IPC_CREAT|S_IRUSR|S_IWUSR);//Read and Write by owner
if(msqid==-1){
errExit("msgget");
}
复制代码
msgsnd()和msgrcv()系统调用执行消息队列上的I/O。这两个系统调用接收的第一个参数是消息队列标识符(msqid)。第二个是参数msgp是一个由程序员定义的结构的指针,该结构用于存放被发送或接受的消息,结构的常规形式以下:服务器
struct msg{
long type;//message type
void* body;//message body
...
}
复制代码
消息的第一个部分必须指明了消息的类型,它用一个类型为long的整数来表示,而消息的剩余部分则是自定义的一个结构,其长度、内容、字段名和类型都均可以是任意的,也能够多个字段。数据结构
需特别指出:消息的大小是除了type字段外的全部字段的大小socket
从消息队列中写入消息须要队列上的写权限ide
#include<sys/types.h>
#include<sys/msg.h>
int msgsnd(int msqid,const void *msgp,size_t msgsz, int msgflg);
//return 0 on success,or -1 on error
复制代码
使用msgsnd()发送消息必需要将消息结构中的type字段值设为一个大于0的值并将须要传递的信息复制到自定义的body字段中。函数
msqid队列的标识符ui
msgp消息的结构体指针spa
msgsz参数指定了body字段包含的字节数,即消息的大小。
msgflg是一组标记的位掩码,用于控制msgsnd()操做
使用msgsnd()发送一条消息
struct mbuf{
long mtype; //message type
char mtext[1024]; //message body
}
struct mbuf msg;
int msqid,msgLen;
...
msgLen=strlen(msg.mtext);
if(msgsnd(msqid,&msg,msgLen,IPC_NOWAIT)==-1){
errExit("msgsnd");
}
...
复制代码
从消息队列中读取消息须要队列上的读权限
#include<sys/types.h>
#include<sys/msg.h>
ssize_t msgrcv(int msqid,void *msgp,size_t maxmsgsz,long msgtyp,int msgflg);
//return number of bytes copied into body field, or -1 on error
复制代码
msqid 队列的标识符
maxmsgsz 参数值要大于或等于需读取的消息的大小
msgp 缓冲区中消息的最大可用空间是经过maxmsgz参数来指定的。若是队列中待删除的消息体的大小超过了maxmsgsz字节,那么就不会从队列中删除消息,而且会返回错误E2BIG。
mtype 读取消息的顺序能够根据mtype字段来选择
mshtyp==0 将会删除队列中的第一条消息并将其返回给调用进程。
msgtyoe>0 将队列中第一条消息里的type字段值等于msgtype的消息删除并返回给进程。能够用于让各个进程选取与本身的进程ID匹配的消息,这样就会竞争读取同一条消息。
msgtype<0 将队列变成优先队列即最小堆形式。队列中消息type最小而且其值小于或等于msgtype的绝对值的第一条消息删除并返回给调用进程,若是没有,则堵塞直到出现匹配的消息为止。
msgflg 控制msgrcv()操做
使用msgrcv()读取一条消息
struct mbuf{
long mtype;
char mtext[1024];
}
int msqid,msgLen;
struct mbuf msg;
....
msgLen=msgrcv(msqid,&msg,1024,0,IPC_NOWAIT);
if(msgLen==-1){
errExit("msgrcv");
}
复制代码
#include<sys/types.h>
#include<sys/msg.h>
int msgctl(int msqid,int cmd,struct msqid_ds *buf);
// return 0 on success,or -1 on error
复制代码
还有一些其余的cmd参数,感兴趣能够网上查看下,这里只列出常见参数。
使用msgctl删除System V 消息队列
...
if(msgctl(msqid,IPC_RMID,NULL)==-1){
errExit("msgctl");
}
复制代码
struct msqid_ds
{
struct ipc_perm msg_perm; /* structure describing operation permission */
__time_t msg_stime; /* time of last msgsnd command */
__time_t msg_rtime; /* time of last msgrcv command */
__time_t msg_ctime; /* time of last change */
__syscall_ulong_t __msg_cbytes; /* current number of bytes on queue */
msgqnum_t msg_qnum; /* number of messages currently on queue */
msglen_t msg_qbytes; /* max number of bytes allowed on queue */
__pid_t msg_lspid; /* pid of last msgsnd() */
__pid_t msg_lrpid; /* pid of last msgrcv() */
__syscall_ulong_t __glibc_reserved4;
__syscall_ulong_t __glibc_reserved5;
};
struct ipc_perm
{
__key_t __key; /* Key. */
__uid_t uid; /* Owner's user ID. */ __gid_t gid; /* Owner's group ID. */
__uid_t cuid; /* Creator's user ID. */ __gid_t cgid; /* Creator's group ID. */
unsigned short int mode; /* Read/write permission. */
unsigned short int __pad1;
unsigned short int __seq; /* Sequence number. */
unsigned short int __pad2;
__syscall_ulong_t __glibc_reserved1;
__syscall_ulong_t __glibc_reserved2;
};
复制代码
修改一个System V 消息队列的msg_qbytes 设置
...
struct msqid_ds ds;
int msqid;
...
if(msgctl(msqid,IPC_STAT,&ds)==-1){
errExit("msgctl");
}
ds.msg_qbytes=1048576 //1MB
if(msgctl(msqid,IPC_SET,&ds)==-1){
errExit("msgctl");
}
复制代码
还有一些其余的限制,感兴趣能够网上查看下,这里只列出常见限制。
Linux 特有的msgctl() IPC_INFO 操做可以获取一个类型为msginfo的结构,其中包含了各类消息队列限制的值
struct msginfo buf;
msgctl(0, IPC_INFO,(struct msqid_ds *)&buf);
/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo
{
int msgpool;
int msgmap;
int msgmax;
int msgmnb;
int msgmni;
int msgssz;
int msgtql;
unsigned short int msgseg;
};
复制代码
服务器端核心代码
...
for(;;){
msgLen=msgcrv(serverId,&req,REQ_MSG_SIZE,0,0);
if(msgLen==-1){
if(errno==EINTR)//Interrupted by SIGCHLD handler?
continue;
errMsg("msgrcv");
break;
}
pid=fork();
if(pid==-1){
errMsg("fork");
break;
}
if(pid==0){
serveRequest(&req);
_exit(EXIT_SUCCESS);
}
}
...
复制代码
客户端核心代码
...
clientId=msgget(IPC_PRIVATE,S_IRUSR|S_IWUSR|S_IWGRP);//确保服务端可以有写权限
...
msgLen=msgrcv(clientId,&req,RESP_MSG_SIZE,0,0);
if(msgLen==-1){
errExit("msgrcv");
}
...
for(;;){
msgLen=msgrcv(clientId,&resp,RESP_MSG_SIZE,0,0);
if(msgLen==-1){
errExit("msgrcv");
}
...
}
...
复制代码
消息队列是经过标识符引用的,而不是像大多数其余UNIX I/O机制那样使用文件描述符。这意味这在各类基于文件描述符的I/O技术(如select()、poll()以及epoll)将没法应用于消息队列上。此外,在程序中编写同时处理消息队列的输入和基于文件描述符的I/O机制的代码要比编写只处理文件描述符的代码要更加复杂。
使用键而不是文件名来标识消息队列会增长额外的程序设计复杂性。ftok()函数一般能产生一个惟一的键,但却没法保证,使用IPC_PRIVATE键能确保产生惟一的队列标识符,但须要使这个标识符对须要用到它的其余进程可见。
消息队列是无链接的,内核不会对待管道、FIFO以及socket那样维护引用队列的进程数,会带来几个问题:
消息队列的总数、消息的大小以及单个队列的容量都是有限制的。这些限制都是可配置的,但若是一个应用程序超出了这些默认限制的范围,那么安装应用程序的时候就须要完成一些额外的工做了。