消息队列是任务间通讯最灵活和强大的功能。相比邮箱只能传递4个字节,消息队列则支持自定义长度。另外消息队列则在消息层面体现的是一种字节流的概念,没有具体数据类型绑定,这么作就随便用户定义消息类型,对于消息队列来讲都是透明的很是灵活。不过因为消息队列实现中须要进行内存拷贝,因此效率倒是最低的,尤为对于长度较长的消息进行通讯时要谨慎。php
-
消息队列类型 -
消息队列建立和初始化 -
消息队列发送 -
消息队列接收
消息队列类型html
向👉滑动查看所有node
/** * message queue structure */struct rt_messagequeue{ struct rt_ipc_object parent; /**< inheritfrom ipc_object */ void *msg_pool; /**< startaddress of message queue */ rt_uint16_t msg_size; /**< messagesize of each message */ rt_uint16_t max_msgs; /**< maxnumber of messages */ rt_uint16_t entry; /**< index ofmessages in the queue */ void *msg_queue_head; /**< list head*/ void *msg_queue_tail; /**< list tail*/ void *msg_queue_free; /**< pointerindicated the free node of queue */};typedef struct rt_messagequeue* rt_mq_t;
从类型字段上来看,首先依然能够看出消息队列与其它IPC同样都继承自struct rt_ipc_objectgit
下面来分析余下的扩展字段:github
msg_pool:消息队列容纳消息体的内存块地址web
msg_size:消息队列里消息的长度浏览器
max_msgs:消息队列可容纳的消息个数微信
entry:消息队列当前已存放的消息个数app
msg_queue_head:消息队列里最前面的消息函数
msg_queue_tail:消息队列里最后面的消息
msg_queue_free:消息队列里第一个空闲位置
不过还须要先介绍一下另外一个消息队列的内部类型:
struct rt_mq_message{ struct rt_mq_message *next;};
这个类型时消息队列里面采用链表实现的链表原型,其实意义不是很大,仅仅做为队列的一个索引用。
消息队列建立和初始化
/** *This function will create a message queue object from system resource * *@param name the name of message queue *@param msg_size the size of message *@param max_msgs the maximum number of message in queue *@param flag the flag of message queue * *@return the created message queue, RT_NULL on error happen */rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag){ struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq; /* set parent */ mq->parent.parent.flag= flag; /* init ipcobject */ rt_ipc_object_init(&(mq->parent)); /* init messagequeue */ /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs; /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) *mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq); return RT_NULL; } /* init messagelist */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp++) { head = (struct rt_mq_message *)((rt_uint8_t*)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; return mq;}
先看函数的参数:
name:消息队列的object名称
msg_size:消息的大小(注意不是消息队列的大小)
max_msgs:消息队列的大小(最多容纳多少条消息)
flag:IPC等待队列的类型(FIFO/PRIO)
/** *This function will send a message to message queue object, if there are *threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer,rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size !=0); /* greater than one message size */ if (size > mq->msg_size) return-RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return-RT_EFULL; } /* move free list pointer */ mq->msg_queue_free=msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* the msg isthe new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer*/ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; } /* set new tail*/ mq->msg_queue_tail = msg; /* if the headis empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; /* increase message entry */ mq->entry++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK;}
/** *This function will send an urgent message to message queue object, which *means the message will be inserted to the head of message queue. If there *are threads suspended on message queue object, it will be waked up. * *@param mq the message queue object *@param buffer the message *@param size the size of buffer * *@return the error code */rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size){ register rt_ubase_t temp; struct rt_mq_message *msg; /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy buffer*/ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to the beginning of message queue */ msg->next = (struct rt_mq_message *)mq->msg_queue_head; mq->msg_queue_head = msg; /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg; /* increase message entry */ mq->entry++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enableinterrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK;}
msg->next= (struct rt_mq_message *)mq->msg_queue_head;
mq->msg_queue_head= msg;
消息队列接收
/** *This function will receive a message from message queue object, if there is *no message in message queue object, the thread shall wait for a specified *time. * *@param mq the message queue object *@param buffer the received message will be saved in *@param size the size of buffer *@param timeout the waiting time * *@return the error code */rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout){ struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; /* parametercheck */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent))); /* disable interrupt */ temp =rt_hw_interrupt_disable(); /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT; /* reset error number in thread */ thread->error= RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; } /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule*/ rt_schedule(); /* recv message*/ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL; /* decrease message entry */ mq->entry--; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy message*/ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size); /* disable interrupt*/ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); return RT_EOK;}
buffer:接收消息的内存地址
size:接收消息的长度
timeout:当消息队列为空时将根据timeout来决定是否等待
RT-Thread线上活动
一、【RT-Thread能力认证考试12月——RCEA】通过第一次考试的验证,RT-Thread能力认证获得了更多社区开发者和产业界的大力支持!(点此查看)若是您有晋升、求职、寻找更好机会的须要,有深刻学习和掌握RT-Thread的需求,欢迎垂询/报考!
能力认证官网连接:https://www.rt-thread.org/page/rac.html(在外部浏览器打开)
当即报名
#题外话# 喜欢RT-Thread不要忘了在GitHub上留下你的STAR
哦,你的star对咱们来讲很是重要!连接地址:https://github.com/RT-Thread/rt-thread
你能够添加微信18917005679为好友,注明:公司+姓名,拉进 RT-Thread 官方微信交流群

RT-Thread
让物联网终端的开发变得简单、快速,芯片的价值获得最大化发挥。Apache2.0协议,可免费在商业产品中使用,不须要公布源码,无潜在商业风险。
长按二维码,关注咱们


点击阅读原文进入Github
本文分享自微信公众号 - RTThread物联网操做系统(RTThread)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。