因为我对于Java并发库JUC的深刻,一直以来有个想法,能不能把Java并发库移植到纯C语言环境下,而且在实现、使用方式上都与Java平台保持至关程度的类似性呢?java
纯C环境下内存模型与Java平台不一致?加上内存屏障(fence)或者lock指令就行。 node
C环境下缺乏对象模型?无非是给每一个数据块提供个*init方法(好比pthread_mutex_t的pthread_mutex_init,pthread_barrier_t的pthread_barrier_init),以后再将逻辑的部分直接写到函数里(phtread_mutex_lock、pthread_barrier_wait),无非是这些逻辑并非像Java平台上同样被绑定到某个"对象"。这些都不是本质上困难的地方。git
真正的困难在于内存回收。github
Java平台,咱们可以对内存作的惟一事情就是申请、建立(new关键字),如此一来便获得一个新的对象。以后,咱们没法直接针对这块内存释放。咱们最多只能把本身目所能及范围内的关于这块内存的"引用"置为null,从而期待GC去回收(前提是这块内存不存在其余引用)。同时,咱们根本没法知道究竟哪一个时刻这块内存会被回收,咱们只能认为,Runtime environment可以选择最适当的时刻。web
同时,Java虚拟机给出了一个更强的保证:只要你的对象(引用)obj != null,那么这个引用所指示的对象即是确定存在的,咱们能够绝对安全地调用obj.method()而不用惧怕任何意外。
把这个场景放在C环境下比喻彷佛就在说:若是你能看到某个地址(数值),那么就放心对他作任何合理的事吧!Runtime environment确保了你的安全!
总之,Java运行时给出了强大的保证:
1 看获得的对象都存在,你能够放心对它操做。
2 那些正在被回收的、你不能安全操做的对象(内存),你绝对没法看到。算法
如此一来,不管是内存独占或共享、线程并发或非并发,咱们都无需担忧内存自己的问题(Java GC回收全部内存,全平台的垃圾回收器)。数组
那么如今咱们回头来看C环境。它的运行时环境根本不帮你作任何事,你甚至能够虚构一个内存地址,而后将它强制转化为虚构的struct类型,接着对它操做。只不过这样极可能破坏了内存,致使不可预期的结果,同时这个错误会一直潜伏,你根本不知道什么时候出现。甚至于,都不须要定义结构体信息,假如你了解运行机器的具体信息,你均可以直接在一个地址上作地址偏移来操做内存......固然,前提是这块内存是安全的。缓存
那么问题就很清楚了,
Java环境下内存安全回收由虚拟机彻底负责。
C环境下,内存安全回收由逻辑单元(线程)自己来负责。安全
也就是说C环境下,咱们不只要处理算法自己的逻辑,同时也要额外去处理内存回收的问题。数据结构
那么C环境下内存回收困难在哪呢?
首先,咱们不考虑不须要回收的内存。咱们知道,独占的内存容易回收,对于共享的内存,假若有个逻辑点,咱们确保全部线程当下以及未来都不会使用这块内存,那么也是能够安全回收的。
因此真正难于回收的是知足如下3个条件的内存:
OK,咱们接着引出Maged M. Michael,在2004发表的《Hazard Pointers:Safe Memory Reclamation for Lock-Free Objects》。
Hazard Pointer能够说是最知名的一种共享内存的回收方案。借用Erez Petrank的ppt,它能够归纳以下:
以及后续变化的Lock-Free Data Structures with Hazard Pointers、Hazard Eras、Maged M. Michael等。
对于每一块被共享的内存,这些技术都须要为它配置另外N(MAX_THREAD)个内存来标记它是否被对应的线程访问,同时为了释放一块内存,都须要遍历这N个内存位置从而断定是否能够安全回收(尽管能够采用先排序,二分搜索等方式下降检索的数据量,可是检索过程的代价依然是随着MAX_THREAD而增加的,就算你总操做数同样。尽管它的算法是lock-free(甚至wait-free)。这里的缺陷在于每一块共享数据都与线程自己牢牢耦合,几乎没有扩展性。
而另外一种被普遍讨论的回收方案是epoch-based ReclamationPractical lock-freedom以及类似的技术Performance of memory reclamation for lockless synchronization,它的性能很好,可是如内存管理规则所说,它并非无阻塞的算法,它在本质上就会阻塞,因此progress没法获得保证。
其中2005年,哥德堡大学的研究团队发表的方案Practical and Efficient Lock-Free Garbage Collection Based on Reference Counting,尽管也采用了引用计数的方式(与我将要介绍的方式相似),可是却并无解除线程自己与共享数据之间的依赖,而且scan时依然要遍历整个线程组。
在不计较lock指令或者fence的状况下,是否存在一种方法可以同时作到:
为了达成以上几个目标,我向你们介绍自创的SHP(Scalable Hazard Pointers)。
Scalable Hazard Pointers分为以下三个部分来说述:
3RE&S指的是以下四个,针对被分配内存地址(pointer)的抽象操做:
同时,对于共享数据的读、写为下面两种方式:
这个协议总结了,包含了hazardpointer以及许多相似技术的处理方式,RECORD操做时将共享内存地址自己也做为传输传入。
几乎任何一种算法,并发共享的数据均可经过以上方式回收。
这里的关键点是:
这里给出个针对Michael原始论文的对比例子:
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; psly_record(&Tail, t); if(Tail != t) { psly_remove(t); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(t); continue; } if(next != NULL){ psly_remove(t); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(t); break; } psly_remove(t); } __sync_bool_compare_and_swap(&Tail, t, node); }
int dequeue(){ int data; NodeType* h; for(;;){ h = Head; //myhprec->HP[0] = h; psly_record(&Tail, h); if(Head != h) { psly_remove(h); continue; } NodeType* t = Tail; NodeType* next = h->next; psly_record(&(h->next), next); //myhprec->HP[1] = next; if(Head != h) { psly_remove(next); psly_remove(h); continue; } if(next == NULL) { psly_remove(next); psly_remove(h); return -1000000; } if(h == t){ psly_remove(next); psly_remove(h); __sync_bool_compare_and_swap(&Tail, t, next); continue; } data = next->value; //myhprec->HP[1] = NULL; //myhprec->HP[0] = NULL; if(__sync_bool_compare_and_swap(&Head, h, next)) { psly_remove(next); psly_remove(h); retireNode(h); break; } psly_remove(next); psly_remove(h); } //myhprec->HP[0] = NULL; //myhprec->HP[1] = NULL; return data; }
首先,为何要为每块共享内存维护N个变量,这样作不只浪费内存并且增长搜索代价。理想状况下咱们应该只须要一个内存数据来处理一个共享内存。那么咱们怎么来处理多个线程引用它的状况呢?这里的一个天然的想法是引入引用计数(reference count),(注意,这里的引用计数是线程引用共享数据,与另外一个对象层次间的引用无关),咱们用refcount表明当下访问它的线程数,refCount>0的内存绝对不会被回收,refCount == 0表明没有线程引用它,处于能够被回收的状态。咱们将这样的一个内存数据称为Record,
struct Record{ //数据字段 void* pointer; int refcount; }
第二,因为Record内存自己是要被维护的,因此咱们的策略是随需分配,已分配的不在回收,咱们将Record做为一种可重用的资源,用于追踪那些共享内存。那么意味着Record可以被高效地从资源库并发获取和返回。
以上斜体字是我以前的观点,我有了点突破,如今我有了技术方案,能以较小代价实现Record的回收了!:)
这里的技巧是用一个固定的self字段,标记Record自己,(好比低10位做为indexNum,接下来4位做为arrayNum),用于惟一的标记Record自身。这么作的目的是为了支持后面的map。
想象一下,咱们已经有了不少共享的pointer(地址),每一个分配一个Record,接着咱们要如何组织它们呢?从而高效的支持add,remove,search操做。
这里的方式是用一个大的map,它具备一个大的buckets数目好比1024。
typedef struct RecordList { Record* volatile head ; Record* volatile tail ; } RecordList ; typedef struct RecordMap { RecordList* lists[1024] ; } RecordMap ;
RecordList初始化以后久拥有了固定的head/tail。
很明显,咱们这里天然的策略是将具备相同后缀的地址base到同一个list上,同时因为开辟的内存地址通常是单调递增,而且保持16字节对齐,咱们能够根据地址自己来避开了hits。
咱们能够抽象出以下的接口以下:
struct Record{ //Record库维护字段 int volatile next ; int self; //数据字段 void* pointer; int refcount; } int record(void* pointer) { long key = ((long) pointer) >> 4 ; RecordList* list = map.lists[key & (1024-1)]; return handle_records(list, pointer, RECORD); }
那么接下来的问题是如何在一个list上组织那些带有一样后缀地址的Record?
这里推荐著名的Harris' list,它作到用一个并发无锁单链表来组织有序数据,支持增长、删除、搜索。
咱们须要给Record再加一个retire'bit和一个long字段nextRecord,它里面包含四个字段:
struct Record{ //Record库维护字段 int volatile next ; int self; //数据字段 void* pointer; int refcount; //retire bool retireBit; long nextRecord; }
而后,对它进行了至关程度的改造,改造的地方以下:
注意,因为以上的操做存在相互依赖,好比新增的Record不能连接到已经逻辑删除的Record上。
因此,当一个Record被从资源库get到以后,生命周期为:
这里最核心的技术就是基于两点断定节点没有失效:
咱们须要一些Record去持有地址,咱们采用随需批量malloc的方式。这种方式的特色是:
以上方式,能够如图所示:
下面给出一个大概例子,采用的经典的MSQueue,固然,存在其余更高效的方式lcrq:
int PSLY_Record_IDXNUM = 16; int PSLY_Record_IDXBIT = ((1 << 16) - 1); int PSLY_Record_ARRAYNUM_MAX = (1 << 4); int PSLY_Record_ARRAYNUM = (1 << 2); int PSLY_Record_ARRAYBITS = ((1 << 4) -1); int PSLY_Record_ARRBIT = (((1 << 4) - 1) << 16); int PSLY_Record_ARRBITR = ((1 << 4) - 1); int PSLY_Record_ARRIDXBIT = ((((1 << 4) - 1) << 16) | ((1 << 16) - 1)); int PSLY_Record_NEXTIDXNUM = 16; int PSLY_Record_NEXTIDXBIT = ((1 << 16) - 1); int PSLY_Record_NEXTTAILNUM = 1; int PSLY_Record_NEXTTAILBIT = (((1 << 1) - 1) << 16); int PSLY_Record_NEXTVERSIONNUM = (32 - 1 - 16); int PSLY_Record_NEXTVERSIONBIT = ((~0)^((((1 << 1) - 1) << 16) | ((1 << 16) - 1))); int PSLY_Record_NEXTVERSIONONE = (1 + ((((1 << 1) - 1) << 16) | ((1 << 16) - 1))); int PSLY_Record_TAILIDXNUM = 16; int PSLY_Record_TAILIDXBIT = ((1 << 16) - 1); int PSLY_Record_TAILVERSIONNUM = (32 - 16); int PSLY_Record_TAILVERSIONBIT = ((~0) ^ ((1 << 16) - 1)); int PSLY_Record_TAILVERSIONONE = (1 + ((1 << 16) - 1)); int PSLY_Record_HEADIDXNUM = 16; int PSLY_Record_HEADIDXBIT = ((1 << 16) - 1); int PSLY_Record_HEADVERSIONNUM = (32 - 16); int PSLY_Record_HEADVERSIONBIT = ((~0) ^ ((1 << 16) - 1)); int PSLY_Record_HEADVERSIONONE = (1 + ((1 << 16) - 1)); typedef struct Record { int volatile next __attribute__((aligned(128))); int self ; long volatile nextRecord __attribute__((aligned(128))); void* volatile pointer ; } Record __attribute__((aligned(128))); typedef struct RecordQueue { int volatile head ; int volatile tail ; } RecordQueue ; static Record* volatile psly_Records[1 << 4]; static RecordQueue volatile psly_Record_queues[1 << 4]; static int volatile recordTake = 0; Record* idx_Record(int index) { return psly_Records[(index & PSLY_Record_ARRBIT) >> PSLY_Record_IDXNUM] + (index & PSLY_Record_IDXBIT); } Record* get_Record() { for(;;) { int localArrayNum = PSLY_Record_ARRAYNUM; //取最高队列 int array = localArrayNum - 1; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } else { break; } } } } // 轮询某些队列 for(int i = 0; i < localArrayNum; ++i) { int array = __sync_fetch_and_add(&recordTake, 1) % localArrayNum; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } else { break; } } } } } // 遍历全部队列 for(int i = 0; i < localArrayNum; ++i) { int array = i; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } } } } } //不够增长 if(localArrayNum == PSLY_Record_ARRAYNUM_MAX) return NULL; if(localArrayNum == PSLY_Record_ARRAYNUM) { if(psly_Records[localArrayNum] == NULL) { int array_ = localArrayNum; Record* record; void * ptr; int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); record = ptr; memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ record->self = (array_ << PSLY_Record_IDXNUM) | j; record->next = j+1; record->pointer = NULL;\ record->nextRecord = 0; record += 1; } record->self = (array_ << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); record->next = PSLY_Record_NEXTTAILBIT; record->pointer = NULL; record->nextRecord = 0; //printf("I'm here %d %ld\n", localArrayNum, pthread_self()); if(!__sync_bool_compare_and_swap(&psly_Records[array_], NULL, ptr)) {free(ptr);} else /*printf("extend to %d\n", localArrayNum + 1)*/; } if(localArrayNum == PSLY_Record_ARRAYNUM) __sync_bool_compare_and_swap(&PSLY_Record_ARRAYNUM, localArrayNum, localArrayNum + 1); } } } void return_Record(Record* record) { long local = (record->next); local |= PSLY_Record_NEXTTAILBIT; record->next = local; int self = record->self; int array = (self >> PSLY_Record_IDXNUM) & PSLY_Record_ARRBITR; Record* arr = psly_Records[array]; RecordQueue* queue = psly_Record_queues + array; for(;;) { int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; Record* tail = arr + indexTail; int nextIndex = (tail->next); if(tailIndex == (queue->tail)){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) { if(__sync_bool_compare_and_swap(&tail->next, nextIndex, (((nextIndex & PSLY_Record_NEXTVERSIONBIT) + PSLY_Record_NEXTVERSIONONE) & PSLY_Record_NEXTVERSIONBIT)|(self & PSLY_Record_NEXTIDXBIT))){ __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(self & PSLY_Record_TAILIDXBIT)); return; } } else { __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } } } } typedef struct RecordList { Record* volatile head ; Record* volatile tail ; } RecordList ; typedef struct RecordMap { volatile RecordList* lists[131070] ; } RecordMap ; static volatile RecordMap map; #define INIT_RESOURCE(listNum) \ for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \ Record* record; \ void * ptr;\ int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record));\ psly_Records[i] = record = ptr; \ memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); \ for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ \ record->self = (i << PSLY_Record_IDXNUM) | j; \ record->next = j+1; \ record->pointer = NULL;\ record->nextRecord = 0;\ record += 1; \ } \ record->self = (i << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); \ record->next = PSLY_Record_NEXTTAILBIT; \ record->pointer = NULL;\ record->nextRecord = 0;\ }\ for(int i = 0; i < PSLY_Record_ARRAYNUM_MAX; ++i){\ psly_Record_queues[i].head = 0; \ psly_Record_queues[i].tail = (1 << PSLY_Record_IDXNUM) - 1; \ } \ for(int i = 0; i < listNum; ++i) { \ void* ptr;\ int ret = posix_memalign(&ptr, 4096, sizeof(RecordList));\ Record* head = get_Record();\ Record* tail = get_Record();\ head->nextRecord = newNext(head->nextRecord, tail); \ map.lists[i] = ptr;\ map.lists[i]->head = head; \ map.lists[i]->tail = tail; \ } #define UNINIT_RESOURCE(listNum) \ for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \ free(psly_Records[i]); \ } \ for(int i = 0; i < listNum; ++i) {\ free(map.lists[i]);\ }
这里的psly_Records[1 << 4],psly_Record_queues[1 << 4] 表明总共能够提供16组,每组65536个数据(PSLY_Record_IDXNUM = 16),初始分配4组数据(PSLY_Record_ARRAYNUM = (1 << 2)),以后若是不够就扩充一组。
局部变量:
对于一块共享内存S,咱们为它的Record保持一个局部变量reordS,只须要在RECORD时候返回,后续的REMOVE跟RETIRE就不须要去查询了,相对于以前的enqueue给出的例子以下:
+++为变更代码
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; +++ Record* recordT = psly_record(&Tail, t); +++ if(recordT == NULL) +++ continue; if(Tail != t) { +++ psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { +++ psly_remove(recordT); continue; } if(next != NULL){ +++ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { +++ psly_remove(recordT); break; } +++ psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
这样以来,每次操做,最须要在RECORD时候遍历一次。
咱们还能够作的更好
线程私有数据:
有些场景的共享内存,会在一段长期时间内不会改变,这种状况的话每次都去查询maplist显得很浪费,咱们能够作个缓存来节省查询。
对于一块肯定的共享内存S,咱们尝试为每线程配置一个私有变量(static __tread),用一段结构化的代码跟踪它的Record,这样以来就不须要每次都查询maplist了,
虽然不是很是合适,但咱们仍是拿前面Hazard pointer的例子作个示例,代码以下:
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; +++ static __thread LocalRecord localRecordT; Record* recordT; +++ if(localRecordT.pointer == NULL) { +++ recordT = psly_record(&Tail, t, NULL, NULL); if(recordT == NULL) continue; +++ else { +++ localRecordT.pointer = t; +++ localRecordT.record = recordT; +++ localRecordT.nextRecord = recordT->nextRecord; +++ } +++ } else { +++ if(t != localRecordT.pointer || isChange(localRecordT.record, localRecordT.nextRecord) || t != localRecordT.record->pointer) { +++ localRecordT.pointer = NULL; +++ continue; +++ } +++ recordT = psly_record(&Tail, t, localRecordT.record, localRecordT.nextRecord); +++ if(recordT == NULL) { +++ localRecordT.pointer = NULL; +++ continue; +++ } +++ } if(Tail != t) { psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(recordT); continue; } if(next != NULL){ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(recordT); break; } psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
这种场景下,假如咱们系统共有N个线程,那么对于一个内存,在它的整个生命周期里,须要查询maplist的次数上限为N+1次!
这种方式极大地减小了查询的次数,从而为设计某些高效的共享数据结构提供了可能。
数据结构自己带Record
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ +++ Record* recordT = TailRecord; +++ int versionT= recordT->version; t = Tail; +++ if(recordT->pointer != t) { +++ if(recordT == TailRecord && recordT->version == versionT) { +++ Record* localRecordT = NULL; //udpate记录Record并给引用计数设置1,Atomicity; +++ if(CAS(&TailRecord, recordT, localRecordT = get_Record(t, ONE))) { +++ return_Record(recordT); +++ } else { +++ return_Record(localRecordT); +++ continue; +++ } +++ } else { +++ continue; +++ } +++ } else if(recordT != TailRecord || recordT->version != versionT) { +++ continue; +++ } else { +++ psly_addOneRecord(recordT); // 增长引用计数 +++ } //now the t and RecordT is match!; if(Tail != t) { psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(recordT); continue; } if(next != NULL){ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(recordT); break; } psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
这里的TailRecord伴随Tail更新。
链表遍历保留前驱:
当咱们查询maplist时候,有可能会由于前驱节点的失效,而要从新在该list的head开始遍历,假如链表过长会代价较大,因此咱们在遍历过程当中维护些前驱节点可能会好点。
示例代码以下:
保留:
if(currKey != key) { int bucket; if((steps & STEPS_) == 0 && (bucket = (steps >> STEPBIT)) < MAXPREV) { Prevs* step = &prevs_[bucket]; step->r = prev; step->rNext = prevNext; } ++steps; prev = curr; prevNext = currNext; }
失效以后启用保留的前驱:
--steps; for(;;) { int bucket = steps >> STEPBIT; bucket = bucket < MAXPREV ? bucket: (MAXPREV - 1); Prevs* prevs = &prevs_[bucket]; prev = prevs->r; prevNext = prev->nextRecord; long prevNextKeep = prevs->rNext; if((prevNextKeep & NODEBITS) != (prevNext & NODEBITS) || (prevNext & REFCBITS) == DELETED) { steps -= STEPS; } else { prevs->rNext = prevNext; curr = idx_Record(prevNext); break; } } steps = steps & (~STEPS_);
这里的steps记录咱们目前所在的位置,STEPS表达咱们隔几个节点记录一次(极端状况下能够拷贝全部遍历过的节点)。
批量获取Record方式指的是,因为获取Record的竞争过于激烈,咱们再也不每次获取一个,而是每次获取一批,剩余的做为线程私有以后使用,维护好数据的 未使用/使用 状态,以及做为总体返回给资源库。从而极大地减小了线程间的竞争。
固然,最好的是咱们再也不让线程竞争Record了,咱们从新组织Record,每一个队列尺寸小一点。对于一个全新的线程,直接给它一个Record队列,这个队列如同以前同样是全局做用域。不够用继续获取整个队列,维护好单个Record元素的使用情况。
这样以来,对于每一个线程而言,它拥有的就是Record[0],Record[2]....等整个队列了。若是一来,每一个资源队列都是被某个线程私有,即单读多写的队列。极大地减小了竞争。
对于某些场景,许多小块的共享数据同时产生,又能够同时回收。咱们再也不为每一个内存地址分配一个Record,咱们尝试将一块大内存的分割为许多小内存来使用,如此一来小内存统一映射到大内存首地址的Record,直接省去了插入链表的操做。我须要对Record进行改造,retireBit再也不做为一个元素使用,这里能够换成short
struct Record{ //Record库维护字段 int volatile next ; int self; //数据字段 void* pointer; int refcount; //retire +++ short retireNum; long nextRecord; }
同时,咱们的psy_record接口增长一个参数:
recordT = psly_record(void** ppointer, void* pointer, Record* record, long nextRecord, short retireNum);
最后讲一下,惟一须要注意的是,若是内存已经处于可回收状态:
那么咱们即可以当即回收内部,由于对于企图使用该内存的线程而言,要么正在递增引用计数,要么已经完成访问。完成访问的不要紧,根据咱们的设计,递增引用计数的线程稍后会回退减一,从而再也不访问这块内存。
假如竞争激烈,那么咱们再也不将引用计数汇集到一个refcount字段上,咱们能够采用多个refcount[M]来分开计数,从而改善竞争状况,有效提升吞吐量。
具体作法能够参考Striped64。
假如咱们在开发一个并发数据结构,它自己将会被共享/动态开辟/回收,数据自己带有指针,指针指向的数据随着程序的执行变得不知足需求,从而咱们要从新配置这一数据,而且回收原数据。
这种状况下咱们能不能正确回收全部数据呢?
答案是能够的。
对于全部内存,若是知足
共享的 / 须要回收的 / 没有明确的能够安全回收内存的逻辑点
咱们都采用3RE&S Protocol提供的语义来回收内存。
由于
最后,咱们必须本身提供freeMemory函数用于先回收内部内存,再回收外部对象的内存。