源码解读·RT-Thread操做系统IPC之消息队列

消息队列是任务间通讯最灵活和强大的功能。相比邮箱只能传递4个字节,消息队列则支持自定义长度。另外消息队列则在消息层面体现的是一种字节流的概念,没有具体数据类型绑定,这么作就随便用户定义消息类型,对于消息队列来讲都是透明的很是灵活。不过因为消息队列实现中须要进行内存拷贝,因此效率倒是最低的,尤为对于长度较长的消息进行通讯时要谨慎。php


本篇文章牵涉到的内容以下:
  1. 消息队列类型
  2. 消息队列建立和初始化
  3. 消息队列发送
  4. 消息队列接收

消息队列类型html

向👉滑动查看所有node

/** * message queue structure */struct rt_messagequeue{ struct rt_ipc_object parent; /**< inheritfrom ipc_object */  void *msg_pool; /**< startaddress of message queue */  rt_uint16_t msg_size; /**< messagesize of each message */ rt_uint16_t max_msgs; /**< maxnumber of messages */  rt_uint16_t entry; /**< index ofmessages in the queue */  void *msg_queue_head; /**< list head*/ void *msg_queue_tail; /**< list tail*/ void *msg_queue_free; /**< pointerindicated the free node of queue */};typedef struct rt_messagequeue* rt_mq_t;


从类型字段上来看,首先依然能够看出消息队列与其它IPC同样都继承自struct rt_ipc_objectgit


下面来分析余下的扩展字段:github

msg_pool:消息队列容纳消息体的内存块地址web

msg_size:消息队列里消息的长度浏览器

max_msgs:消息队列可容纳的消息个数微信

entry:消息队列当前已存放的消息个数app

msg_queue_head:消息队列里最前面的消息函数

msg_queue_tail:消息队列里最后面的消息

msg_queue_free:消息队列里第一个空闲位置


关于这些字段如今不须要过多解释,在后续接口实现里能够看到具体的用途。


不过还须要先介绍一下另外一个消息队列的内部类型:

struct rt_mq_message{ struct rt_mq_message *next;};


这个类型时消息队列里面采用链表实现的链表原型,其实意义不是很大,仅仅做为队列的一个索引用。

消息队列建立和初始化

消息队列建立依然使用动态建立为例来分析:
/** *This function will create a message queue object from system resource * *@param name the name of message queue *@param msg_size the size of message *@param max_msgs the maximum number of message in queue *@param flag the flag of message queue * *@return the created message queue, RT_NULL on error happen */rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag){ struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp;    RT_DEBUG_NOT_IN_INTERRUPT;  /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq;  /* set parent */ mq->parent.parent.flag= flag;  /* init ipcobject */ rt_ipc_object_init(&(mq->parent));  /* init messagequeue */  /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs;  /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) *mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq);  return RT_NULL; }  /* init messagelist */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL;  /* init message empty list */ mq->msg_queue_free = RT_NULL;   for (temp = 0; temp < mq->max_msgs; temp++) { head = (struct rt_mq_message *)((rt_uint8_t*)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; }  /* the initial entry is zero */ mq->entry = 0;  return mq;}


先看函数的参数:

name:消息队列的object名称

msg_size:消息的大小(注意不是消息队列的大小)

max_msgs:消息队列的大小(最多容纳多少条消息)

flag:IPC等待队列的类型(FIFO/PRIO)


而后看函数内部实现,最开始分配RT_Object_Class_MessageQueue类型的object对象,并初始化,这和其它IPC没什么两样。接着计算msg_size并对大小进行调整到对齐。接着分配消息队列内存块时要仔细看分配时的大小: (mq->msg_size+sizeof(struct rt_mq_message)) *mq->max_msgs)为何不是msg_size*max_msgs呢?这就跟消息队列的实现有关了,消息队列内部会将每一个消息经过rt_mq_message类型的链表给连接起来,因此会额外在每一个消息体前面放一个rt_mq_message链表节点。


而后接着往下看一个for循环块,这块代码初看起来总感受哪里有点怪怪的。仔细一看原来是将消息队列里每一个存储消息的位置经过一个倒序的链表给链接起来了。首先咱们前面分配到一块连续的内存空间,而后每一个消息占用的大小是rt_mq_message+msg_size,因此使用(rt_mq_message+msg_size)*temp来得到消息在这片连续内存区里的索引。而后在每条消息的起始位置放一个rt_mq_message指针,并一次链接起来。同时初始化head/tail/free等指针。初始化完后,此时整个消息队列的状态以下:  

   

消息队列发送

消息队列发送有两个API,一个是基于FIFO模型的rt_mq_send,一个是基于LIFO模型的rt_mq_urgent。先来看rt_mq_send:
/** *This function will send a message to message queue object, if there are *threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer,rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg;  /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size !=0);  /* greater than one message size */ if (size > mq->msg_size) return-RT_ERROR;  RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  return-RT_EFULL; } /* move free list pointer */ mq->msg_queue_free=msg->next;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* the msg isthe new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer*/ rt_memcpy(msg + 1, buffer, size);  /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */   if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; }  /* set new tail*/ mq->msg_queue_tail = msg; /* if the headis empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg;  /* increase message entry */ mq->entry++;  /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread));  /* enable interrupt */ rt_hw_interrupt_enable(temp);  rt_schedule();  return RT_EOK; }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  return RT_EOK;}


先看函数的参数,mq为建立或初始化过的消息队列对象句柄。buffer为所须要发送的消息指针,size为所要发送消息的大小。固然size不要超过buffer的内存大小也不要超过初始化时指定的消息大小。超过buffer大小那么就是内存越界,超过初始化时指定的大小则会发送失败。可是却能够小于初始化时指定的大小。


看函数内部实现,直接跳过前面的部分,从关中断的临界区开始看。首先从msg_queue_free取出一个空闲节点(msg = (struct rt_mq_message *)mq->msg_queue_free;),若是不为NULL,也就是消息队列不满的时候,则表明还能够往消息队列里存入消息。那么将msg_queue_free = msg->next;也就是将msg_queue_free指向下一个空闲区域,等价msg_queue_free = msg_queue_free->next。而后往消息头部的链表节点后面开始拷贝发送的消息:rt_memcpy(msg +1, buffer, size);并接着初始化或者调整msg_queue_tail/head两个指针。若是msg_queue_tail/head指向NULL,则说明消息队列尚未消息,直接让tail和head指向这个消息便可。不然tail则须要调整,指向这个新消息,表示最后一个发送来的消息,同时也代表新消息是放入队列末尾(先入先出FIFO模型)。接着消息发送处理完成,对entry进行计数,而后看IPC等待队列是否须要调度。


/** *This function will send an urgent message to message queue object, which *means the message will be inserted to the head of message queue. If there *are threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg;  /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0);  /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR;  RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  return -RT_EFULL; } /* move free list pointer */   mq->msg_queue_free = msg->next;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* copy buffer*/ rt_memcpy(msg + 1, buffer, size);  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* link msg to the beginning of message queue */ msg->next = (struct rt_mq_message *)mq->msg_queue_head; mq->msg_queue_head = msg;  /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg;  /* increase message entry */ mq->entry++;  /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread));  /* enableinterrupt */ rt_hw_interrupt_enable(temp);  rt_schedule();  return RT_EOK; }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  return RT_EOK;}


rt_mq_urgent与rt_mq_send区别就在于消息存放的位置。urgent从语义上表示的是发送紧急消息,从代码上看起始就是将消息放入消息队列头:

msg->next= (struct rt_mq_message *)mq->msg_queue_head;

mq->msg_queue_head= msg;

第一句让当前消息的链表next指针指向已有的消息队列头节点。第二局调整消息队列头指针msg_queue_head指向新的消息。因此新的消息就犹如放在消息队列头同样,在接收消息的时候从msg_queue_head开始取出消息便可(后入先出LIFO模型)。

消息队列接收

/** *This function will receive a message from message queue object, if there is *no message in message queue object, the thread shall wait for a specified *time. * *@param mq the message queue object *@param buffer the received message will be saved in *@param size the size of buffer *@param timeout the waiting time * *@return the error code */rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout){ struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta;  /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0);  /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));  /* disable interrupt */ temp =rt_hw_interrupt_disable();  /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp);  return -RT_ETIMEOUT; }  /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT;  /* reset error number in thread */ thread->error= RT_EOK;  /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp);  thread->error = -RT_ETIMEOUT;  return -RT_ETIMEOUT; }  /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag);  /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get();  RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list", thread->name));  /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); }  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* re-schedule*/ rt_schedule();  /* recv message*/ if (thread->error != RT_EOK) { /* return error */ return thread->error; }  /* disable interrupt */ temp = rt_hw_interrupt_disable();  /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } }  /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head;  /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL;  /* decrease message entry */ mq->entry--;  /* enable interrupt */ rt_hw_interrupt_enable(temp);  /* copy message*/ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);  /* disable interrupt*/ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp);  RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));  return RT_EOK;}


消息队列接收与其它IPC接收过程并没有太大区别,并都支持timeout超时等待。先来看函数的参数:

buffer:接收消息的内存地址

size:接收消息的长度

timeout:当消息队列为空时将根据timeout来决定是否等待


函数内部实现前部分与其它IPC接收状况相似,根据消息队列是否为空以及是否须要timeout等待来处理,当等待时则启动任务timer,并将任务挂起在IPC的suspend链表中,而后触发调度器调度到其它任务。


等到等待超时则直接返回,不然就表明被发送端唤醒。只要是被发送端唤醒则表明可能有可用的消息能够接收,这种状况和上篇文章mbox状况相似。唤醒后首先判断当前剩余的等待时间,而后检查是否有可用的消息能够接收。接收消息的时候从msg_queue_head处取出消息便可,同时将msg_queue_head指向next,接着将entry计数减一,而后拷贝消息到buffer中,注意拷贝的尺寸,当超过消息队列的消息尺寸时则最多只拷贝消息的长度,不然消息将被截短。最后将当前的队列节点进行回收,回收时将其放入msg_queue_free链表中。

END


RT-Thread线上活动


一、【RT-Thread能力认证考试12月——RCEA】通过第一次考试的验证,RT-Thread能力认证获得了更多社区开发者和产业界的大力支持(点此查看)若是您有晋升、求职、寻找更好机会的须要,有深刻学习和掌握RT-Thread的需求,欢迎垂询/报考!

能力认证官网连接:https://www.rt-thread.org/page/rac.html(在外部浏览器打开)


当即报名


#题外话# 喜欢RT-Thread不要忘了在GitHub上留下你的STAR哦,你的star对咱们来讲很是重要!连接地址:https://github.com/RT-Thread/rt-thread


你能够添加微信18917005679为好友,注明:公司+姓名,拉进 RT-Thread 官方微信交流群

RT-Thread


让物联网终端的开发变得简单、快速,芯片的价值获得最大化发挥。Apache2.0协议,可免费在商业产品中使用,不须要公布源码,无潜在商业风险。

长按二维码,关注咱们


看这里,求赞!求转发!

点击阅读原文进入Github

本文分享自微信公众号 - RTThread物联网操做系统(RTThread)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索