在Golang中实现有无限容量的同步Queue

chan对象是Golang的一个核心卖点,能够轻松实现goroutine之间的通讯。Golang容许咱们为chan设置不一样的缓冲大小。当默认缓冲大小为0的时候,一个goroutine对chan的写入操做必需要等到有其余goroutine对chan进行读取的时候才会返回,反之一个goroutine对chan进行读取的时候要等到另一个goroutine对chan进行写入才会返回。若是咱们不但愿每次对chan进行读取和写入都堵塞的话,能够对chan设置缓冲大小。这样,在缓冲区没满以前,goroutine能够不停的对chan进行写入而不会发生堵塞,直到缓冲区被填满。git

有时候咱们须要把某个请求或者数据放入到chan中,而后马上返回去作其余事情。这种状况下为了不chan发生堵塞,咱们须要为chan设置一个足够大的缓冲大小。若是缓冲大小设置的太小,就很难避免出现堵塞,而把缓冲大小设置的过大,又会形成额外的内存开销,由于chan对象在建立(make)的时候就已经分配了足够的内存做为缓冲。github

所以我在实际项目中常常使用一个同步的先入先出队列(SyncQueue)。数据生产者调用队列的Push函数将数据添加到队列中,Push函数在任何状况下不会发生堵塞。数据消费者使用Pop函数得到一个数据。若是队列中当前为空,则Pop函数会挂起当前goroutine,直到有其余goroutine Push新的数据到队列中。SyncQueue不须要提早生成一个巨大的缓存,所以不会占用大量的内存,而且提供无限(除非内存满)的队列长度。golang

同步队列(SyncQueue)实现:https://github.com/xiaonanln/go-xnsyncutil/blob/master/xnsyncutil/sync_queue.goapache

接口文档:https://godoc.org/github.com/xiaonanln/go-xnsyncutil/xnsyncutil#SyncQueue缓存

 1 package xnsyncutil
 2 
 3 import (
 4     "sync"
 5 
 6     "gopkg.in/eapache/queue.v1"
 7 )
 8 
 9 // Synchronous FIFO queue
10 type SyncQueue struct {
11     lock    sync.Mutex
12     popable *sync.Cond
13     buffer  *queue.Queue
14     closed  bool
15 }
16 
17 // Create a new SyncQueue
18 func NewSyncQueue() *SyncQueue {
19     ch := &SyncQueue{
20         buffer: queue.New(),
21     }
22     ch.popable = sync.NewCond(&ch.lock)
23     return ch
24 }
25 
26 // Pop an item from SyncQueue, will block if SyncQueue is empty
27 func (q *SyncQueue) Pop() (v interface{}) {
28     c := q.popable
29     buffer := q.buffer
30 
31     q.lock.Lock()
32     for buffer.Length() == 0 && !q.closed {
33         c.Wait()
34     }
35 
36     if buffer.Length() > 0 {
37         v = buffer.Peek()
38         buffer.Remove()
39     }
40 
41     q.lock.Unlock()
42     return
43 }
44 
45 // Try to pop an item from SyncQueue, will return immediately with bool=false if SyncQueue is empty
46 func (q *SyncQueue) TryPop() (v interface{}, ok bool) {
47     buffer := q.buffer
48 
49     q.lock.Lock()
50 
51     if buffer.Length() > 0 {
52         v = buffer.Peek()
53         buffer.Remove()
54         ok = true
55     } else if q.closed {
56         ok = true
57     }
58 
59     q.lock.Unlock()
60     return
61 }
62 
63 // Push an item to SyncQueue. Always returns immediately without blocking
64 func (q *SyncQueue) Push(v interface{}) {
65     q.lock.Lock()
66     if !q.closed {
67         q.buffer.Add(v)
68         q.popable.Signal()
69     }
70     q.lock.Unlock()
71 }
72 
73 // Get the length of SyncQueue
74 func (q *SyncQueue) Len() (l int) {
75     q.lock.Lock()
76     l = q.buffer.Length()
77     q.lock.Unlock()
78     return
79 }
80 
81 func (q *SyncQueue) Close() {
82     q.lock.Lock()
83     if !q.closed {
84         q.closed = true
85         q.popable.Signal()
86     }
87     q.lock.Unlock()
88 }

 

 

Category: Golang 标签:changolangqueuesync函数

相关文章
相关标签/搜索