先给出建立消息队列的程序:shell
#include <stdio.h> #include <stdlib.h> #include <mqueue.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #define MQ_NAME ("/tmp") #define MQ_FLAG (O_RDWR | O_CREAT | O_EXCL) // 建立MQ的flag #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) // 设定建立MQ的权限 int main() { mqd_t posixmq; int rc = 0; struct mq_attr mqattr; mqattr.mq_maxmsg = 3; mqattr.mq_msgsize = 1024; posixmq = mq_open(MQ_NAME, MQ_FLAG, FILE_MODE, &mqattr); // 建立只可存放三条消息的消息队列 if(-1 == posixmq) { perror("建立MQ失败"); exit(1); } rc = mq_close(posixmq); if(0 != rc) { perror("关闭失败"); exit(1); } #if 0 rc = mq_unlink(MQ_NAME); if(0 != rc) { perror("删除失败"); exit(1); } #endif return 0; }
编译并执行:
[infor@s123 PosixMq]$ gcc -o createmq createmq.c -lrt [infor@s123 PosixMq]$ ./createmq
最后程序并无删除消息队列(消息队列有随内核持续性),如再次执行该程序则会给出错误信息:
[infor@s123 PosixMq]$ ./createmq 建立MQ失败: File exists
消息队列的读写主要使用下面两个函数:
#include <mqueue.h> /* 返回:若成功则为0, 若出错则为-1 */ ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); /* 返回:若成功则为消息中字节数,若出错则为-1 */ int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
下面给出向消息队列写消息的程序:
#include <stdio.h> #include <stdlib.h> #include <mqueue.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> /*向消息队列发送消息,消息队列名及发送的信息经过参数传递*/ int main(int argc, char *argv[]) { mqd_t mqd; char *ptr; size_t len; unsigned int prio; int rc; if(argc != 4) { printf("Usage: sendmq <name> <bytes> <priority>\n"); exit(1); } len = atoi(argv[2]); prio = atoi(argv[3]); //只写模式找开消息队列 mqd = mq_open(argv[1], O_WRONLY); if(-1 == mqd) { perror("打开消息队列失败"); exit(1); } // 动态申请一块内存 ptr = (char *) calloc(len, sizeof(char)); if(NULL == ptr) { perror("申请内存失败"); mq_close(mqd); exit(1); } /*向消息队列写入消息,如消息队列满则阻塞,直到消息队列有空闲时再写入*/ rc = mq_send(mqd, ptr, len, prio); if(rc < 0) { perror("写入消息队列失败"); mq_close(mqd); exit(1); } // 释放内存 free(ptr); return 0; }
编译并执行:
[infor@s123 PosixMq]$ gcc -o sendmq sendmq.c -lrt [infor@s123 PosixMq]$ ./sendmq /tmp 30 15 [infor@s123 PosixMq]$ ./sendmq /tmp 30 16 [infor@s123 PosixMq]$ ./sendmq /tmp 30 17 [infor@s123 PosixMq]$ ./sendmq /tmp 30 18
上面前后向消息队列“/tmp”写入了四条消息,由于先前建立的消息队列只容许存放3条消息,本次第四次写入时程序会阻塞。直到有另外进程从消息队列取走消息后本次写入才成功返回。函数
下面经过程序读消息队列:spa
#include <stdio.h> #include <stdlib.h> #include <mqueue.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> /*读取某消息队列,消息队列名经过参数传递*/ int main(int argc, char *argv[]) { mqd_t mqd; struct mq_attr attr; char *ptr; unsigned int prio; size_t n; int rc; if(argc != 2) { printf("Usage: readmq <name>\n"); exit(1); } /*只读模式打开消息队列*/ mqd = mq_open(argv[1], O_RDONLY); if(mqd < 0) { perror("打开消息队列失败"); exit(1); } // 取得消息队列属性,根据mq_msgsize动态申请内存 rc = mq_getattr(mqd, &attr); if(rc < 0) { perror("取得消息队列属性失败"); exit(1); } /*动态申请保证能存放单条消息的内存*/ ptr = calloc(attr.mq_msgsize, sizeof(char)); if(NULL == ptr) { printf("动态申请内存失败\n"); mq_close(mqd); exit(1); } /*接收一条消息*/ n = mq_receive(mqd, ptr, attr.mq_msgsize, &prio); if(n < 0) { perror("读取失败"); mq_close(mqd); free(ptr); exit(1); } printf("读取 %ld 字节\n 优先级为 %u\n", (long)n, prio); return 0; }
编译并执行:
[infor@s123 PosixMq]$ ./readmq /tmp 读取 30 字节 优先级为 17 [infor@s123 PosixMq]$ ./readmq /tmp 读取 30 字节 优先级为 18 [infor@s123 PosixMq]$ ./readmq /tmp 读取 30 字节 优先级为 16 [infor@s123 PosixMq]$ ./readmq /tmp 读取 30 字节 优先级为 15 [infor@s123 PosixMq]$ ./readmq /tmp
程序执行五次,第一次执行完,先前阻塞在写处的程序成功返回。第五次执行,由于消息队列已经为空,程序阻塞。直到另外的进程向消息队列写入一条消息。
另外,还能够看出Posix消息队列每次读出的都是消息队列中优先级最高的消息。code
2011-11-17 任洪彩 qdurenhongcai@163.com队列
转载请注明出处。进程