POSIX消息队列信号通知

相比于System V消息队列的问题之一是没法通知一个进程什么时候在某个队列中放置了一个消息,POSIX消息队列容许异步事件通知(asyschronous event notification),有两种方式进行选择:安全

1,产生一个信号数据结构

2,建立一个线程来执行一个指定的函数异步

     

指定队列创建和删除异步事件通知: async

      #include <mqueue.h>函数

       int mq_notify(mqd_t mqdes, const struct sigevent *sevp);atom

 

struct sigeventspa

{线程

int sigev_notify; //notification type    SIGEV_{NONE, SIGNAL, THREAD}code

int sigev_signo; //signal number if SIGEV_SIGNAL队列

union sigval   sigev_value; // passed to signal handler or thread

                                                                 // following two is SIGEV_THREAD

void (*sigev_notify_function)(union sigval); 

pthread_attr_t *sigev_notify_attributes;

}

union sigval

{

int sival_int; //integer value

void *sival_ptr; //pointer value

}

运用此函数的若干规则:

(1),若是notification为非空,那么此进程被注册为接收该队列的通知

(2),若是notification为空,那么此进程已存在的注册将被撤销

(3),任什么时候刻只有一个进程能够被注册为接收某个队列的通知

(4),若是当一个消息到达队列时,此时正好有个mq_receive阻塞在该队列上,那么通知不会发出

(5),当通知到达注册进程时,其注册即被撤销

注意:通知只有在一个消息被放置到一个空队列上时才会发出,若是这个消息队列不是空队列,则通知无效。

例子:简单通知

代码:

//mqnotifysig1.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <signal.h>

mqd_t mqd;
void *buff;
struct mq_attr attr;
struct sigevent sigev;
static void sig_usr1(int);

int main(int argc, char **argv)
{
    if (argc != 2) {
        printf("usage: mqnotifysig1 <name>");
        exit(0);
    }

    if ((mqd = mq_open(argv[1], O_RDONLY)) == -1) {
        printf("open error");
        exit(0);
    }

    mq_getattr(mqd, &attr);
    buff = malloc(attr.mq_msgsize);

    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    for  ( ; ; )
        pause();
    exit(0);
}

static void sig_usr1(int signo)
{
    ssize_t n;

    mq_notify(mqd, &sigev);
    n = mq_receive(mqd, buff, attr.mq_msgsize, NULL);
    printf("SIGUSR1 received, read %ld bytes\n", (long) n);
    return;
}

异步信号安全函数:

 这里涉及到一个异步信号安全(async-signal-safe)的概念:就是在信号处理的函数中有些函数是不能用的,这么讲,可重入的函数就是信号安全。

知足下列条件的函数是不可重入的: 

1) 函数体内使用了静态的数据结构; 

2) 函数体内调用了malloc() 或者free() 函数; 

3) 函数体内调用了标准I/O 函数。 
如何编写可重入的函数: 
    1) 不要使用全局变量。由于别的代码极可能覆盖这些变量值。 
    2) 不能调用其它任何不可重入的函数。 
    3) 在和硬件发生交互的时候,切记执行相似disinterrupt() 之类的操做,就是关闭硬件中断。完成交互记得打开中断,在有些系列上,这叫作“ 进入/ 退出核心” 。 

    4) 谨慎使用堆栈。最好先在使用前先OS_ENTER_KERNAL 。 


 例子:信号通知,让处理程序仅仅设置一个全局标志,有某个线程检查该标志以肯定什么时候接收到一个信息

代码:

//mqnotifysig2.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <signal.h>

volatile sig_atomic_t mqflag;
static void sig_usr1(int);

int main(int argc, char **argv)
{
    mqd_t mqd;
    void *buff;
    ssize_t n;
    sigset_t zeromask, newmask, oldmask;
    struct mq_attr attr;
    struct sigevent sigev;

    if (argc != 2) {
    mq_notify(mqd, &sigev);
        printf("usage: mqnotifysig2 <name>");
        exit(0);
    }

    mqd = mq_open(argv[1], O_RDONLY);
    mq_getattr(mqd, &attr);
    buff = malloc(attr.mq_msgsize);

    sigemptyset(&zeromask);
    sigemptyset(&newmask);
    sigemptyset(&oldmask);
    sigaddset(&newmask, SIGUSR1);

    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    for( ; ; ) {
        sigprocmask(SIG_BLOCK, &newmask, &oldmask);
        while (mqflag == 0)
                sigsuspend(&zeromask);
        mqflag = 0;
        mq_notify(mqd, &sigev);
        n = mq_receive(mqd, buff, attr.mq_msgsize, NULL);
        printf("read %ld bytes\n",(long) n);
        sigprocmask(SIG_UNBLOCK, &newmask, NULL);
    }
    exit(0);
}

static void sig_usr1(int signo)
{
    mqflag = 1;
    printf("functin sig_usr1 has been called\n");
    return;
}


上面的例子有个小小的bug,就是在一个通知来了以后一段时间,即便有消息发送到队列中,也不会发送出通知来的。这时咱们能够经过将mq_receive非阻塞读取来避免。同时,在上面的例子中,主线程被阻塞,信号处理程序执行,主线程再次执行,这样效率很不高,咱们能够经过阻塞在某个函数里,仅仅等待该信号的递交,而不是让内核执行一个只为设置一个标志的信号处理程序。因此咱们用到sigwait函数。

       #include <signal.h>

        int sigwait(const sigset_t *set, int *sig);

此函数阻塞的等待信号集set中信号的到来,一旦某个信号到来,则继续执行,同时将这个信号存入sig中。

例子:
代码:

//mqnotifysig4.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <signal.h>
#include <errno.h>

int main(int argc, char **argv)
{
    int signo;
    mqd_t mqd;
    void *buff;
    ssize_t n;
    sigset_t newmask;
    struct mq_attr attr;
    struct sigevent sigev;

    if (argc != 2) {
        printf("usage: mqnotifysig3 <name>");
        exit(0);
    }
    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
    mq_getattr(mqd, &attr);
    buff = malloc(attr.mq_msgsize);

    sigemptyset(&newmask);
    sigaddset(&newmask, SIGUSR1);
    sigprocmask(SIG_BLOCK, &newmask, NULL);

    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    for ( ; ; ) {
        sigwait(&newmask, &signo);
        if (signo == SIGUSR1) {
            mq_notify(mqd, &sigev);
            while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                printf("reda %ld bytes\n",(long) n);
            }
            if (errno != EAGAIN) {
                printf("mq_receive error");
                exit(0);
            }
        }
    }
    exit(0);
}


消息队列描述符并非普通的文件描述符,因此咱们没法使用select机制来高效率的处理,可是咱们能够模仿出来,经过在信号处理函数中,往一个管道写入一个字符,那么在主线程中select此管道描述就OK 了。

例子:

代码:

//mqnotifysig5.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <signal.h>
#include <errno.h>

int pipefd[2];
static void sig_usr1(int);

int main(int argc, char **argv)
{
    int nfds;
    char c;
    fd_set rset;
    mqd_t mqd;
    ssize_t n;
    void *buff;
    struct mq_attr attr;
    struct sigevent sigev;

    if (argc != 2) {
        printf("usage: mq_notifysig5 <name>");
        exit(0);
    }

    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
    mq_getattr(mqd, &attr);
    buff = malloc(attr.mq_msgsize);

    pipe(pipefd);

    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    FD_ZERO(&rset);

    for ( ; ; ) {
        FD_SET(pipefd[0], &rset);
        nfds = select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

        if (FD_ISSET(pipefd[0], &rset)) {
            read(pipefd[0], &c, 1);
            mq_notify(mqd, &sigev);
            while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                printf("read %ld bytes\n", (long) n);
            }
            if (errno != EAGAIN) {
                printf("mq_receive error");
                exit(0);
            }
        }
    }
    exit(0);
}

static void sig_usr1(int signo)
{
    write(pipefd[1], "", 1);
    return;
}


异步事件通知的另外一种方式是把sigev_notify设置成SIGEV_THREAD,这样当一消息放到一个空的队列上,会建立一个线程。

例子:

代码:

//mqnotifysigthread1.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <signal.h>
#include <errno.h>
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;

static void notify_thread(union sigval);

int main(int argc, char **argv)
{
    if (argc != 2) {
        printf("usage: mqnotifythread1 <name>");
        exit(0);
    }

    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
    mq_getattr(mqd, &attr);

    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_value.sival_ptr = NULL;
    sigev.sigev_notify_function = notify_thread;
    sigev.sigev_notify_attributes = NULL;
    mq_notify(mqd, &sigev);

    for ( ; ; ) 
        pause;
    exit(0);
}

static void notify_thread(union sigval arg)
{
    ssize_t n;
    void *buff;

    printf("notify_thread started\n");
    buff = malloc(attr.mq_msgsize);
    mq_notify(mqd, &sigev);

    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
        printf("read %ld bytes\n", (long) n);
    }
    if (errno != EAGAIN) {
        printf("mq_receive error\n");
        exit(0);
    }

    free(buff);
    pthread_exit(NULL);
}
相关文章
相关标签/搜索