本文分析基于Nginx-1.2.6,与旧版本或未来版本可能有些许出入,但应该差异不大,可作参考node
在Nginx中对array、list、queue、RB tree和hash表进行了实现,这些结构所涉及的内存管理都是在内存池中进行,源代码都位于src/core目录下。nginx
#Array# 相对来讲,数组是Nginx中最简单的数据结构,它是在内存池中分配的,与语言原生的数组相比,加强了功能,使用时可把它当作变长的,没必要担忧越界访问。经过ngx_array_t结构体描述了数组的结构信息。算法
为方便描述,称ngx_array_t结构体为数组的头(header),为数组元素分配的连续空间为数组的体(body)。数组
其结构以下图所示:数据结构
其中elts指向的是数组真实数据的存放位置,nelts是指数组中已经填充的元素个数,size是元素的大小,nalloc是分配的数组空间可容纳的元素数,pool指向的是数组内存所在的内存池。函数
数组操做比较简单,共有5个函数,ui
建立ngx_array_create:其过程是首先分配空间给一个ngx_array_t结构体(即header),而后分配n*size大小的空间供数组中元素存放,以后初始化header,并返回它的首地址。google
初始化ngx_array_init:它与建立相比,只少了第一个步骤。指针
销毁ngx_array_destroy:它并非真正地释放数组所占用空间(内存池中的内存释放由内存池统一进行),而是有条件地更新内存池信息。(条件是:数组空间在内存池已用空间的最后位置)code
ngx_array_push:这也并非真正地往数组中添加一个元素,而只是向数组请求划分出一个元素大小的空间并返回。此过程分两种状况, * 若是数组body中还有空闲,则把第一个可填充元素的位置返回; * 若是数组body已满,而且
ngx_array_push_n:同上面同样,这也只是向数组请求划分出n个元素大小的空间并返回,过程大同小异。
瞧下array的使用(这段代码在ngx_hash.c中,在下面分析hash表时会见到ngx_hash_key_t 结构,可见ngx_array_push只是返回可填充元素的位置,具体内容还得再调用以后赋值:
<!-- lang: cpp --> ngx_array_t curr_names; ngx_hash_key_t *name; if (ngx_array_init(&curr_names, hinit->temp_pool, nelts, sizeof(ngx_hash_key_t)) != NGX_OK) { return NGX_ERROR; } name = ngx_array_push(&curr_names); if (name == NULL) { return NGX_ERROR; } name->key.len = len; name->key.data = names[n].key.data; name->key_hash = hinit->key(name->key.data, name->key.len); name->value = names[n].value;
#List#
list结构中有个单向链表把全部的part串联起来,并在last域中记录下最后一个part便于在链表结尾插入新part,list中全部元素大小是相同的为size,nalloc指的是在list中已经分配了多少个元素的空间。
list中只有三个相关操做,create,init,push,在ngx_list.h和ngx_list.c中定义。
与array相似的是,ngx_XXX_create只是对array或list的空间进行分配,固然它们的空间都分两部分,一部分是header,一部分是body;ngx_XXX_push都只是返回一个能够填充元素的位置并对header进行更新,而body中的元素内容在调用此函数以后再赋值填充。
与array不一样的是,在push操做中,当已分配的空间都填满元素时,list会新建一个part(ngx_list_part_t),并在新part中划出空间并返回。
源码中给出了遍历list的示例代码以下:
<!-- lang: cpp --> /* * the iteration through the list: */ part = &list.part; data = part->elts; for (i = 0 ;; i++) { if (i >= part->nelts) { if (part->next == NULL) { break; } part = part->next; data = part->elts; i = 0; } ... data[i] ... }
#queue#
queue是中双向循环队列,设有一标记(sentinel),它以后是队列头,以前是队列尾。结构以下图所示。
对queue的操做多用宏定义,除图中画出的,还有:
ngx_queue_t中并无存储数据,因此使用时需在自定义结构体中嵌入一个ngx_queue_t类型的变量,其使用方法以下图所示:
另外有两个函数:
#RB Tree#
红黑树是一种平衡的二叉查找树。其性质和操做在各类算法书上都有,搜一下,也不少,不细讲。贴上相关结构代码以下:
<!-- lang: cpp --> typedef ngx_uint_t ngx_rbtree_key_t; typedef ngx_int_t ngx_rbtree_key_int_t; typedef struct ngx_rbtree_node_s ngx_rbtree_node_t; struct ngx_rbtree_node_s { ngx_rbtree_key_t key; ngx_rbtree_node_t *left; ngx_rbtree_node_t *right; ngx_rbtree_node_t *parent; u_char color; u_char data; }; typedef struct ngx_rbtree_s ngx_rbtree_t; typedef void (*ngx_rbtree_insert_pt) (ngx_rbtree_node_t *root, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); struct ngx_rbtree_s { ngx_rbtree_node_t *root; ngx_rbtree_node_t *sentinel; ngx_rbtree_insert_pt insert; };
其中*sentinel就是叶子节点(Nil)。
函数ngx_rbtree_insert_value和ngx_rbtree_insert_timer_value为实现的ngx_rbtree_insert_pt handler,是按普通二叉查找树的方式插入一个节点,并标记此节点的color为红色,这是红黑树中插入节点的第一个步骤。二者的区别,在于节点key值的比较,后者考虑了值溢出的状况,是针对节点key是timer值而实现的。
<!-- lang: cpp --> /*在ngx_rbtree_insert_value中*/ p = (node->key < temp->key) ? &temp->left : &temp->right; /* 在ngx_rbtree_insert_timer_value中*/ /* * Timer values * 1) are spread in small range, usually several minutes, * 2) and overflow each 49 days, if milliseconds are stored in 32 bits. * The comparison takes into account that overflow. */ /* * 要比较的两个timer值一般相差不大,大概几分钟而已 * 而0xFFFFFFFF小于50天的毫秒数,因此当值超过50天时就会溢出 * 因此0xFFFFFFF0和0x0000000F的timer值比较,应该认为0x0000000F值溢出 * 其原本值应该大于0xFFFFFFF0,因此才有此区别。 */ p = ((ngx_rbtree_key_int_t) (node->key - temp->key) < 0) ? &temp->left : &temp->right; //红黑树tree中插入节点node void ngx_rbtree_insert(ngx_thread_volatile ngx_rbtree_t *tree,ngx_rbtree_node_t *node) //红黑树tree中删除节点node void ngx_rbtree_delete(ngx_thread_volatile ngx_rbtree_t *tree, ngx_rbtree_node_t *node)
#hash# 这里已经有很好的分析,建立hash和查找的过程可参考他那里。
把他的图也复制了过来,
下面就注释下ngx_hash_init函数吧
<!-- lang: cpp --> /* names数组中有nelts个ngx_hash_key_t ,是要填充到hash表中的数据*/ ngx_int_t ngx_hash_init(ngx_hash_init_t *hinit, ngx_hash_key_t *names, ngx_uint_t nelts) { u_char *elts; size_t len; u_short *test; ngx_uint_t i, n, key, size, start, bucket_size; ngx_hash_elt_t *elt, **buckets; /*bucket_size为每一个桶容许占用的最大空间,经过这个循环保证桶有足够空间 * 盛放至少一个ngx_hash_elt_t,另外ngx_hash_elt_t可看作变长结构,由于如图所示 * 紧挨其结构体以后仍然存放这name剩余的len-1个字节 * NGX_HASH_ELT_SIZE(&names[n])意思是 * 与names[n]对应的的ngx_hash_elt_t所需的空间*/ for (n = 0; n < nelts; n++) { if (hinit->bucket_size < NGX_HASH_ELT_SIZE(&names[n]) + sizeof(void *)) { ngx_log_error(NGX_LOG_EMERG, hinit->pool->log, 0, "could not build the %s, you should " "increase %s_bucket_size: %i", hinit->name, hinit->name, hinit->bucket_size); return NGX_ERROR; } } /*test中存放的是每一个桶所需的空间*/ test = ngx_alloc(hinit->max_size * sizeof(u_short), hinit->pool->log); if (test == NULL) { return NGX_ERROR; } bucket_size = hinit->bucket_size - sizeof(void *); start = nelts / (bucket_size / (2 * sizeof(void *))); start = start ? start : 1; if (hinit->max_size > 10000 && nelts && hinit->max_size / nelts < 100) { start = hinit->max_size - 1000; } /*size为桶个数,从start开始,当分配的桶个数合适时跳到found*/ for (size = start; size < hinit->max_size; size++) { ngx_memzero(test, size * sizeof(u_short)); for (n = 0; n < nelts; n++) { if (names[n].key.data == NULL) { continue; } key = names[n].key_hash % size; test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n])); if (test[key] > (u_short) bucket_size) { goto next;/*若是有个桶所需空间超过了容许最大值则增长桶的数量*/ } } goto found; next: continue; } /*桶数量已经最大,仍然有某个桶没有足够的空间盛放对应的项*/ ngx_log_error(NGX_LOG_EMERG, hinit->pool->log, 0, "could not build the %s, you should increase " "either %s_max_size: %i or %s_bucket_size: %i", hinit->name, hinit->name, hinit->max_size, hinit->name, hinit->bucket_size); ngx_free(test); return NGX_ERROR; found: /*此时size是合适的桶个数*/ for (i = 0; i < size; i++) { test[i] = sizeof(void *); } /*桶个数定下后,test中是每一个桶应分配的内存大小*/ for (n = 0; n < nelts; n++) { if (names[n].key.data == NULL) { continue; } key = names[n].key_hash % size; test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n])); } len = 0; /*循环结束后len中存放的是为全部的桶应分配的内存大小*/ for (i = 0; i < size; i++) { if (test[i] == sizeof(void *)) { continue; } /*每一个桶对齐到cacheline边界上时所需的内存大小*/ test[i] = (u_short) (ngx_align(test[i], ngx_cacheline_size)); len += test[i]; } /*这个地方图上有解释*/ if (hinit->hash == NULL) { hinit->hash = ngx_pcalloc(hinit->pool, sizeof(ngx_hash_wildcard_t) + size * sizeof(ngx_hash_elt_t *)); if (hinit->hash == NULL) { ngx_free(test); return NGX_ERROR; } buckets = (ngx_hash_elt_t **) ((u_char *) hinit->hash + sizeof(ngx_hash_wildcard_t)); } else { buckets = ngx_pcalloc(hinit->pool, size * sizeof(ngx_hash_elt_t *)); if (buckets == NULL) { ngx_free(test); return NGX_ERROR; } } /*全部桶所占空间的起始位置也要对齐到cacheline边界上*/ elts = ngx_palloc(hinit->pool, len + ngx_cacheline_size); if (elts == NULL) { ngx_free(test); return NGX_ERROR; } elts = ngx_align_ptr(elts, ngx_cacheline_size); /*将buckets中的每一个桶指针指向对应的桶空间首地址*/ for (i = 0; i < size; i++) { if (test[i] == sizeof(void *)) { continue; } buckets[i] = (ngx_hash_elt_t *) elts; elts += test[i]; } for (i = 0; i < size; i++) { test[i] = 0; } /*填充每一个元素到对应的桶的对应位置*/ for (n = 0; n < nelts; n++) { if (names[n].key.data == NULL) { continue; } key = names[n].key_hash % size; elt = (ngx_hash_elt_t *) ((u_char *) buckets[key] + test[key]); elt->value = names[n].value; elt->len = (u_short) names[n].key.len; ngx_strlow(elt->name, names[n].key.data, names[n].key.len); test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n])); } /*每一个桶的结尾是一值为NULL的指针*/ for (i = 0; i < size; i++) { if (buckets[i] == NULL) { continue; } elt = (ngx_hash_elt_t *) ((u_char *) buckets[i] + test[i]); elt->value = NULL; } ngx_free(test); /*把分配的buckets链接到hash上,并在hash->size中记录下桶的个数*/ hinit->hash->buckets = buckets; hinit->hash->size = size; return NGX_OK; }
ngx_hash_key和ngx_hash_key_lc是实现的两个ngx_hash_key_pt,用来对字符串生成hash值,区别是后者计算的是字符串所有小写后的hash值。
至于还有个ngx_hash_wildcard_init,里面有递归调用,如今还迷糊着,先放着吧。。