前面刚讲到goroutine和channel,经过goroutine启动一个协程,经过channel的方式在多个goroutine中传递消息来保证并发安全。今天咱们来学习sync包,这个包是Go提供的基础包,提供了锁的支持。可是Go官方给的建议是:不要以共享内存的方式来通讯,而是要以通讯的手段来共享内存。因此他们是提倡使用channel的方式来实现并发控制。web
学过Java的同窗对锁的概念确定不陌生,在Java中提供Sychronized
关键字提供独占锁,Lock
类提供读写锁。在sync包中实现的功能也是与锁相关,包中主要包含的对象有:安全
咱们先用Go写一段经典的并发场景:并发
package main import ( "fmt" "time" ) func main() { var a = 0 for i := 0;i<1000;i++{ go func(i int) { a += 1 fmt.Println(a) }(i) } time.Sleep(time.Second) }
运行这段程序,你会发现最后输出的不是1000。app
这个时候你可使用Mutex:svg
package main import ( "fmt" "sync" "time" ) func main() { var a = 0 var lock sync.Mutex for i := 0;i<1000;i++{ go func(i int) { lock.Lock() a += 1 fmt.Println(a) lock.Unlock() }(i) } time.Sleep(time.Second) }
Mutex实现了Locker接口,因此他有Lock()方法和Unlock()方法。只须要在须要同步的代码块上下使用这两个方法就好。函数
Mutex等同于Java中的Synchronized关键字或者Lock。学习
相似于Java中的ReadWriteLock。读写锁有以下四个方法:ui
写操做的锁定和解锁 * func (*RWMutex) Lock * func (*RWMutex) Unlock 读操做的锁定和解锁 * func (*RWMutex) Rlock * func (*RWMutex) RUnlock
当有一个 goroutine 得到写锁定,其它不管是读锁定仍是写锁定都将阻塞直到写解锁;spa
当有一个 goroutine 得到读锁定,其它读锁定仍然能够继续 ;线程
当有一个或任意多个读锁定,写锁定将等待全部读锁定解锁以后才可以进行写锁定 。
总结上面的三句话能够得出结论:
看一个读写锁的例子:
package main import ( "fmt" "strconv" "sync" "time" ) var ( rwLock sync.RWMutex data = "" ) func read(ran int) { time.Sleep(time.Duration(ran) * time.Microsecond) rwLock.RLock() fmt.Printf("读操做开始:%s\n",data) data = "" rwLock.RUnlock() } func write(subData string) { rwLock.Lock() data = subData fmt.Printf("写操做开始:%s\n",data) rwLock.Unlock() } func deduce() { for i:=0;i<10;i++ { go write(strconv.Itoa(i)) } for i:=0;i<10;i++ { go read(i * 100) } } func main() { deduce() time.Sleep(2*time.Second) }
运行上面的程序,会发现写操做都执行了,可是读操做不是将全部写的数字都读出来了。这是由于读操做是能够同时有多个goroutine获取锁的,可是写操做只能同时有一个goroutine执行。
WaitGroup 用于等待一组 goroutine 结束,它有三个方法:
func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait()
与Java中类比的话,类似与CountDownLatch。
package main import ( "fmt" "sync" "time" ) func goWithMountain(p int,wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("%d,我已经上来了\n",p) } func main() { var wg sync.WaitGroup wg.Add(10) for i:=0;i<10;i++ { go goWithMountain(i,&wg) } wg.Wait() time.Sleep(2*time.Second) fmt.Printf("=爬山结束\n") } 输出: 0,我已经上来了 9,我已经上来了 3,我已经上来了 7,我已经上来了 8,我已经上来了 6,我已经上来了 2,我已经上来了 4,我已经上来了 5,我已经上来了 1,我已经上来了 =爬山结束
是否是有同样同样的呢。
与互斥量不一样,条件变量的做用并非保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其余所以而被阻塞的线程。条件变量老是与互斥量组合使用。互斥量为共享数据的访问提供互斥支持,而条件变量能够就共享数据的状态的变化向相关线程发出通知。 下面给出主要的几个函数:
func NewCond(l Locker) *Cond:用于建立条件,根据实际状况传入sync.Mutex或者sync.RWMutex的指针,必定要是指针,不然会发生复制致使锁的失效 func (c *Cond) Broadcast():唤醒条件上的全部goroutine func (c *Cond) Signal():随机唤醒等待队列上的goroutine,随机的方式效率更高 func (c *Cond) Wait():挂起goroutine的操做
看一个读写操做的例子:
package main import ( "bytes" "fmt" "io" "sync" "time" ) type MyDataBucket struct { br *bytes.Buffer gmutex *sync.RWMutex rcond *sync.Cond //读操做须要用到的条件变量 } func NewDataBucket() *MyDataBucket { buf := make([]byte, 0) db := &MyDataBucket{ br: bytes.NewBuffer(buf), gmutex: new(sync.RWMutex), } db.rcond = sync.NewCond(db.gmutex.RLocker()) return db } func (db *MyDataBucket) Read(i int) { db.gmutex.RLock() defer db.gmutex.RUnlock() var data []byte var d byte var err error for { //读取一个字节 if d, err = db.br.ReadByte(); err != nil { if err == io.EOF { if string(data) != "" { fmt.Printf("reader-%d: %s\n", i, data) } db.rcond.Wait() data = data[:0] continue } } data = append(data, d) } } func (db *MyDataBucket) Put(d []byte) (int, error) { db.gmutex.Lock() defer db.gmutex.Unlock() //写入一个数据块 n, err := db.br.Write(d) db.rcond.Broadcast() return n, err } func main() { db := NewDataBucket() go db.Read(1) go db.Read(2) for i := 0; i < 10; i++ { go func(i int) { d := fmt.Sprintf("data-%d", i) db.Put([]byte(d)) }(i) time.Sleep(100 * time.Millisecond) } }
上例中,读操做必依赖于写操做先写入数据才能开始读。当读取的数据为空的时候,会先调用wait()方法阻塞当前方法,在Put方法中写完数据以后会调用Broadcast()去广播,告诉阻塞者能够开始了。
Pool 用于存储临时对象,它将使用完毕的对象存入对象池中,在须要的时候取出来重复使用,目的是为了不重复建立相同的对象形成 GC 负担太重。从 Pool 中取出对象时,若是 Pool 中没有对象,将返回 nil,可是若是给 Pool.New 字段指定了一个函数的话,Pool 将使用该函数建立一个新对象返回。
sync.Pool能够安全被多个线程同时使用,保证线程安全。这个Pool和咱们通常意义上的Pool不太同样 ,Pool没法设置大小,因此理论上只受限于系统内存大小。Pool中的对象不支持自定义过时时间及策略,究其缘由,Pool并非一个Cache。
看一个小例子:
package main import ( "fmt" "sync" ) func main() { //咱们建立一个Pool,并实现New()函数 sp := sync.Pool{ New: func() interface{} { return make([]int, 16) }, } item := sp.Get() fmt.Println("item : ", item) //咱们对item进行操做 //New()返回的是interface{},咱们须要经过类型断言来转换 for i := 0; i < len(item.([]int)); i++ { item.([]int)[i] = i } fmt.Println("item : ", item) //使用完后,咱们把item放回池中,让对象能够重用 sp.Put(item) //再次从池中获取对象 item2 := sp.Get() //注意这里获取的对象就是上面咱们放回池中的对象 fmt.Println("item2 : ", item2) //咱们再次获取对象 item3 := sp.Get() //由于池中的对象已经没有了,因此又从新经过New()建立一个新对象,放入池中,而后返回 //因此item3是大小为16的空[]int fmt.Println("item3 : ", item3) } 输出: item : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] item : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] item2 : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] item3 : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
Once 的做用是屡次调用但只执行一次,Once 只有一个方法,Once.Do(),向 Do 传入一个函数,这个函数在第一次执行 Once.Do() 的时候会被调用,之后再执行 Once.Do() 将没有任何动做,即便传入了其它的函数,也不会被执行,若是要执行其它函数,须要从新建立一个 Once 对象。
看一个很简单的例子:
package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("我只会出现一次") } done := make(chan bool) for i := 0; i < 3; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 3; i++ { <-done } }