按个人理解,消息队列是Skynet的核心,Skynet就是围绕着消息队列来工做的。
这个消息队列分为两部分:全局队列和服务队列。每一个服务都有一个本身的服务队列,服务队列被全局队列引用。主进程经过多个线程来不断的从全局队列中取出服务队列,而后分发服务队列中的消息到对应的服务。算法
今天,我将拨开消息队列的面纱,一探究竟。数组
既然是数据结构,就是用来存储数据的,伴随着它的就要有添加、删除、访问接口。因为它是用来存储消息的,不难想到:向某服务发送消息,就是向服务的服务队列中添加消息。而Skynet是经过多线程来分发消息的,线程的工做就是遍历全局队列,分发服务队列中的消息到服务。session
我就按照这个思路,带着问题,去看看Skynet的实现:数据结构
- 全局队列和服务队列的结构
- 全局队列和服务队列的生成
- 如何向全局队列添加/删除服务队列
- 如何向服务队列添加/删除消息
- 工做线程如何分发消息
结构
服务队列结构
struct message_queue { uint32_t handle; int cap; int head; int tail; int lock; int release; int in_global; struct skynet_message *queue; struct message_queue *next; };
初看此结构,感受很像链表:next指向下一个节点,queue存储消息数据。实际上是错的,稍微思考一下:若是是链表的话,那message_queue
的其余数据(handle,cap等)岂不是要被复制多份?这显然不符合大神对代码质量的要求。
既然不是经过链表的方式去实现的,那么很容易就会想到:是经过数组的形式来实现的,queue
实际上是一个动态申请的数组,里面存了不少条消息,而cap(容量)、head(头)、tail(尾)是为queue
服务的。可是next
指针又有什么用呢?
先无论这么多了,继续读代码找答案吧。多线程
全局队列结构
struct global_queue { uint32_t head; uint32_t tail; struct message_queue ** queue; struct message_queue *list; };
生成
全局队列
一个Skynet进程中,只有一个全局队列,在系统启动的时候就会经过skynet_mq_init
生成它:app
void skynet_mq_init() { struct global_queue *q = skynet_malloc(sizeof(*q)); memset(q,0,sizeof(*q)); q->queue = skynet_malloc(MAX_GLOBAL_MQ * sizeof(struct message_queue *)); memset(q->queue, 0, sizeof(struct message_queue *) * MAX_GLOBAL_MQ); Q=q; }
须要注意的是:它直接申请了MAX_GLOBAL_MQ
个message_queue
用于存储服务队列,因此服务队列的总数不能超过MAX_GLOBAL_MQ
。dom
服务队列
因为服务队列是属于服务的,因此服务队列的生命周期应和服务一致:载入服务的时候生成,卸载服务的时候删除。
服务是经过skynet_context_new
载入的,在此函数中,能够找到对应的服务队列的生成语句:函数
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); struct message_queue * skynet_mq_create(uint32_t handle) { struct message_queue *q = skynet_malloc(sizeof(*q)); q->handle = handle; q->cap = DEFAULT_QUEUE_SIZE; q->head = 0; q->tail = 0; q->lock = 0; q->in_global = MQ_IN_GLOBAL; q->release = 0; q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap); q->next = NULL; return q; }
在Skynet内部,是经过handle来定位服务的,handle就至关与服务的地址,此函数保存了服务的handle,这样,之后就能够经过服务队列的handle,直接找到对应的服务了。
默认的容量是DEFAULT_QUEUE_SIZE
(64),从这里就能够印证咱们上面的判断了:message_queue
是经过数组保存消息的,不是经过链表。
全局队列操做
全局队列是一个用固定大小的数组模拟的循环队列,此循环队列向尾部添加,从头部删除,分别用head、tail记录其首尾下标。
全局队列保存全部的服务队列,worker线程向全局队列索取服务队列。为了效率,并非简单的把全部的服务队列都塞到全局队列中,而是只塞入非空的服务队列,这样worker线程就不会获得空的服务队列而浪费资源。
因为工做线程有多个,为了不冲突,Skynet运用了这样的策略:每次worker线程取得一个服务队列的时候,都把这个服务队列从全局队列中删除,这样其余的worker线程就无法获取到这个服务队列了,当此worker线程操做完毕后,再将此服务队列添加到全局队列(若服务队列非空的话)。
可能触发全局队列添加操做的状况有:
- 向服务队列中添加消息(空变非空)
- worker线程处理完毕,服务队列非空
可能触发全局队列删除操做的状况有:
- 从服务队列中删除消息(非空变空)
- worker线程获取消息队列
添加
void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); if (!__sync_bool_compare_and_swap(&q->queue[tail], NULL, queue)) { // The queue may full seldom, save queue in list assert(queue->next == NULL); struct message_queue * last; do { last = q->list; queue->next = last; } while(!__sync_bool_compare_and_swap(&q->list, last, queue)); return; } }
不要被那些原子操做函数吓倒,它们其实要作的很简单,只是为了保证操做的原子性,防止多线程冲突问题,才单独封装成一个API,详细解释见:GCC内置原子内存存取函数。
当向这样的固定大小的循环队列添加元素的时候,会遇到以下状况:
- tail溢出
- 队列满了
上述代码中,tail溢出的问题是经过GP
取模操做来解决的:
#define GP(p) ((p) % MAX_GLOBAL_MQ)
若是队列满了,怎么办呢?通常的解决办法有:扩大容量、直接返回操做失败等。Skynet没有采用这样的方法,它是这么作的:
struct message_queue * last; do { last = q->list; queue->next = last; } while(!__sync_bool_compare_and_swap(&q->list, last, queue));
由于要考虑多线程的问题,代码显的比较难读,咱们简化一下:
queue->next = q->list; q->list = queue;
这样就很清晰了,实际上就是:将新的服务队列queue
添加到全局队列的额外服务队列链表list
中。这样,global_queue
的list
中,就存放了全部没有成功添加的服务队列(由于全局队列满了)。
删除
删除的算法就很简单了:
- 非空检查
- 取得head下标,作溢出处理(GP)
- 取出当前的头节点
- 将head下标对应的指针值空
- head加1
这里有一个细节,还记得上面的添加操做有可能遇到全局队列满的状况吗?这里会尝试将那些添加失败的队列添加到全局队列中:
struct message_queue * list = q->list; if (list) { struct message_queue * newhead = list->next; if (__sync_bool_compare_and_swap(&q->list, list, newhead)) { list->next = NULL; skynet_globalmq_push(list); } }
由于每次都只会pop一个,因此,每次只从list中取一个push进全局队列。
服务队列操做
服务队列中存储了全部发给此服务的消息。
服务队列是可变大小的循环队列,其容量会在运行时动态增长。
添加
经过调用skynet_mq_push
来将消息添加到服务队列:
void skynet_mq_push(struct message_queue *q, struct skynet_message *message) { q->queue[q->tail] = *message; if (++ q->tail >= q->cap) q->tail = 0; if (q->head == q->tail) expand_queue(q); if (q->in_global == 0) { q->in_global = MQ_IN_GLOBAL; skynet_globalmq_push(q); } }
同全局队列同样,它也会遇到:下标溢出、队列满的状况,因为它是可扩容的循环队列,当队列满的时候,就调用expand_queue
来扩容(当前容量的两倍)。
这里须要注意的是,最后作了这样的处理:若是当前的服务队列没有被添加到全局队列,则将它添加进去,这是为worker线程而作的优化。
删除
删除的操做就很简单了:head+1。
细节上考虑了下标溢出的问题,并会在队列为空的时候,将队列的in_global
值为false。
为何这里只设置一个标记呢?为何不从全局队列中删除呢?
哈哈!由于只有worker线程才会操做服务队列,而当worker线程获取到服务队列的时候,已经将它从全局队列中删除了。
消息分发
消息分发是经过启动多个worker线程来作的,而worker线程则不断的循环调用skynet_context_message_dispatch
,为了便于理解,我删掉了一些细节:
struct message_queue * skynet_context_message_dispatch(struct message_queue *q) { if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) return NULL; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); struct skynet_message msg; if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return skynet_globalmq_pop(); } _dispatch_message(ctx, &msg); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
这个函数有两种状况:
- 传入的
message_queue
为NULL - 传入的
message_queue
非NULL
对于第一种状况,它会到全局队列中pop一个出来,后面的和第二种状况同样了。
分发步骤以下:
- 经过
message_queue
得到服务的handle - 经过handle查找到服务的
skynet_context
- 从
message_queue
中pop一个元素 - 调用
_dispatch_message
进行消息分发 - 若是全局队列为空,则直接返回此队列(这样下次就会继续处理这个队列,此函数是循环调用的)
- 若是全局队列非空,则pop全局队列,获得下一个服务队列
- 将此队列插入全局队列,返回下一个服务队列
只因此不一次性处理玩当前队列,而要用5~7的步骤,是为了消息调度的公平性,对每个服务都公平。
_dispatch_message
以下:
static void _dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { int type = msg->sz >> HANDLE_REMOTE_SHIFT; size_t sz = msg->sz & HANDLE_MASK; if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) skynet_free(msg->data); }
它从skynet_message
消息中分解出类型和大小,而后调用服务的callback。
这里须要注意的是:若是消息的callback返回0,则消息的data
将被释放。