Go中sync包学习

前面刚讲到goroutine和channel,经过goroutine启动一个协程,经过channel的方式在多个goroutine中传递消息来保证并发安全。今天咱们来学习sync包,这个包是Go提供的基础包,提供了锁的支持。可是Go官方给的建议是:不要以共享内存的方式来通讯,而是要以通讯的手段来共享内存。因此他们是提倡使用channel的方式来实现并发控制。web

学过Java的同窗对锁的概念确定不陌生,在Java中提供Sychronized关键字提供独占锁,Lock类提供读写锁。在sync包中实现的功能也是与锁相关,包中主要包含的对象有:安全

  • Locker:提供了加锁和解锁的接口
  • Cond:条件等待经过 Wait 让例程等待,经过 Signal 让一个等待的例程继续,经过 Broadcast 让全部等待的例程继续。
  • Map:线程安全的map ,同时被多个goroutines调用是安全的。
  • Mutex:互斥锁,用来保证在任一时刻,只能有一个例程访问某对象。实现了Locker接口。Mutex 的初始值为解锁状态,Mutex 一般做为其它结构体的匿名字段使用,使该结构体具备 Lock 和 Unlock 方法
  • Once:Once 是一个能够被屡次调用可是只执行一次,若每次调用Do时传入参数f不一样,可是只有第一个才会被执行。
  • Pool:用于存储临时对象,它将使用完毕的对象存入对象池中,在须要的时候取出来重复使用,其中存放的临时对象随时可能被 GC 回收掉若是该对象再也不被其它变量引用
  • RWMutex:读写互斥锁,RWMutex 比 Mutex 多了一个“读锁定”和“读解锁”,可让多个例程同时读取某对象。RWMutex 的初始值为解锁状态。RWMutex 一般做为其它结构体的匿名字段使用。
  • WaitGroup :用于等待一组例程的结束。主例程在建立每一个子例程的时候先调用 Add 增长等待计数,每一个子例程在结束时调用 Done 减小例程计数。以后主例程经过 Wait 方法开始等待,直到计数器归零才继续执行。

1. Mutex 互斥锁使用

咱们先用Go写一段经典的并发场景:并发

package main

import (
	"fmt"
	"time"
)

func main() {
	var a = 0
	for i := 0;i<1000;i++{
		go func(i int) {
			a += 1
			fmt.Println(a)
		}(i)
	}
	time.Sleep(time.Second)
}

运行这段程序,你会发现最后输出的不是1000。app

这个时候你可使用Mutex:svg

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var a = 0
	var lock sync.Mutex
	for i := 0;i<1000;i++{
		go func(i int) {
			lock.Lock()
			a += 1
			fmt.Println(a)
			lock.Unlock()
		}(i)
	}
	time.Sleep(time.Second)
}

Mutex实现了Locker接口,因此他有Lock()方法和Unlock()方法。只须要在须要同步的代码块上下使用这两个方法就好。函数

Mutex等同于Java中的Synchronized关键字或者Lock。学习

2. 读写锁-RWMutex

相似于Java中的ReadWriteLock。读写锁有以下四个方法:ui

写操做的锁定和解锁
* func (*RWMutex) Lock
* func (*RWMutex) Unlock
读操做的锁定和解锁
* func (*RWMutex) Rlock
* func (*RWMutex) RUnlock

当有一个 goroutine 得到写锁定,其它不管是读锁定仍是写锁定都将阻塞直到写解锁;spa

当有一个 goroutine 得到读锁定,其它读锁定仍然能够继续 ;线程

当有一个或任意多个读锁定,写锁定将等待全部读锁定解锁以后才可以进行写锁定 。

总结上面的三句话能够得出结论:

  1. 同时只能有一个 goroutine 可以得到写锁定;
  2. 同时能够有任意多个 goroutine 得到读锁定;
  3. 同时只能存在写锁定或读锁定(读和写互斥)。

看一个读写锁的例子:

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

var (
	rwLock sync.RWMutex
	data = ""
)

func read(ran int) {
	time.Sleep(time.Duration(ran) * time.Microsecond)
	rwLock.RLock()
	fmt.Printf("读操做开始:%s\n",data)
	data = ""
	rwLock.RUnlock()
}

func write(subData string)  {
	rwLock.Lock()
	data = subData
	fmt.Printf("写操做开始:%s\n",data)
	rwLock.Unlock()
}

func deduce()  {
	for i:=0;i<10;i++ {
		go write(strconv.Itoa(i))
	}
	for i:=0;i<10;i++ {
		go read(i * 100)
	}
}


func main() {
	deduce()
	time.Sleep(2*time.Second)
}

运行上面的程序,会发现写操做都执行了,可是读操做不是将全部写的数字都读出来了。这是由于读操做是能够同时有多个goroutine获取锁的,可是写操做只能同时有一个goroutine执行。

3. WaitGroup

WaitGroup 用于等待一组 goroutine 结束,它有三个方法:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

与Java中类比的话,类似与CountDownLatch。

package main

import (
	"fmt"
	"sync"
	"time"
)

func goWithMountain(p int,wg *sync.WaitGroup)  {
	defer wg.Done()
	fmt.Printf("%d,我已经上来了\n",p)
}

func main() {
	var wg sync.WaitGroup
	wg.Add(10)
	for i:=0;i<10;i++ {
		go goWithMountain(i,&wg)
	}
	wg.Wait()
	time.Sleep(2*time.Second)
	fmt.Printf("=爬山结束\n")
}

输出:
0,我已经上来了
9,我已经上来了
3,我已经上来了
7,我已经上来了
8,我已经上来了
6,我已经上来了
2,我已经上来了
4,我已经上来了
5,我已经上来了
1,我已经上来了
=爬山结束

是否是有同样同样的呢。

4. Cond条件变量

与互斥量不一样,条件变量的做用并非保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其余所以而被阻塞的线程。条件变量老是与互斥量组合使用。互斥量为共享数据的访问提供互斥支持,而条件变量能够就共享数据的状态的变化向相关线程发出通知。 下面给出主要的几个函数:

func NewCond(l Locker) *Cond:用于建立条件,根据实际状况传入sync.Mutex或者sync.RWMutex的指针,必定要是指针,不然会发生复制致使锁的失效
func (c *Cond) Broadcast():唤醒条件上的全部goroutine
func (c *Cond) Signal():随机唤醒等待队列上的goroutine,随机的方式效率更高
func (c *Cond) Wait():挂起goroutine的操做

看一个读写操做的例子:

package main

import (
	"bytes"
	"fmt"
	"io"
	"sync"
	"time"
)

type MyDataBucket struct {
	br     *bytes.Buffer
	gmutex *sync.RWMutex
	rcond  *sync.Cond //读操做须要用到的条件变量
}

func NewDataBucket() *MyDataBucket {
	buf := make([]byte, 0)
	db := &MyDataBucket{
		br:     bytes.NewBuffer(buf),
		gmutex: new(sync.RWMutex),
	}
	db.rcond = sync.NewCond(db.gmutex.RLocker())
	return db
}

func (db *MyDataBucket) Read(i int) {
	db.gmutex.RLock()
	defer db.gmutex.RUnlock()
	var data []byte
	var d byte
	var err error
	for {
		//读取一个字节
		if d, err = db.br.ReadByte(); err != nil {
			if err == io.EOF {
				if string(data) != "" {
					fmt.Printf("reader-%d: %s\n", i, data)
				}
				db.rcond.Wait()
				data = data[:0]
				continue
			}
		}
		data = append(data, d)
	}
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
	db.gmutex.Lock()
	defer db.gmutex.Unlock()
	//写入一个数据块
	n, err := db.br.Write(d)
	db.rcond.Broadcast()
	return n, err
}

func main() {
	db := NewDataBucket()
	go db.Read(1)
	go db.Read(2)
	for i := 0; i < 10; i++ {
		go func(i int) {
			d := fmt.Sprintf("data-%d", i)
			db.Put([]byte(d))
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
}

上例中,读操做必依赖于写操做先写入数据才能开始读。当读取的数据为空的时候,会先调用wait()方法阻塞当前方法,在Put方法中写完数据以后会调用Broadcast()去广播,告诉阻塞者能够开始了。

5.Pool 临时对象池

Pool 用于存储临时对象,它将使用完毕的对象存入对象池中,在须要的时候取出来重复使用,目的是为了不重复建立相同的对象形成 GC 负担太重。从 Pool 中取出对象时,若是 Pool 中没有对象,将返回 nil,可是若是给 Pool.New 字段指定了一个函数的话,Pool 将使用该函数建立一个新对象返回。

sync.Pool能够安全被多个线程同时使用,保证线程安全。这个Pool和咱们通常意义上的Pool不太同样 ,Pool没法设置大小,因此理论上只受限于系统内存大小。Pool中的对象不支持自定义过时时间及策略,究其缘由,Pool并非一个Cache。

看一个小例子:

package main

import (
	"fmt"
	"sync"
)


func main() {
	//咱们建立一个Pool,并实现New()函数
	sp := sync.Pool{
		New: func() interface{} {
			return make([]int, 16)
		},
	}
	item := sp.Get()
	fmt.Println("item : ", item)
	//咱们对item进行操做
	//New()返回的是interface{},咱们须要经过类型断言来转换
	for i := 0; i < len(item.([]int)); i++ {
		item.([]int)[i] = i
	}
	fmt.Println("item : ", item)

	//使用完后,咱们把item放回池中,让对象能够重用
	sp.Put(item)

	//再次从池中获取对象
	item2 := sp.Get()
	//注意这里获取的对象就是上面咱们放回池中的对象
	fmt.Println("item2 : ", item2)
	//咱们再次获取对象
	item3 := sp.Get()
	//由于池中的对象已经没有了,因此又从新经过New()建立一个新对象,放入池中,而后返回
	//因此item3是大小为16的空[]int
	fmt.Println("item3 : ", item3)
}

输出:
item :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
item :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item2 :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item3 :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]

6. Once 执行一次

Once 的做用是屡次调用但只执行一次,Once 只有一个方法,Once.Do(),向 Do 传入一个函数,这个函数在第一次执行 Once.Do() 的时候会被调用,之后再执行 Once.Do() 将没有任何动做,即便传入了其它的函数,也不会被执行,若是要执行其它函数,须要从新建立一个 Once 对象。

看一个很简单的例子:

package main

import (
	"fmt"
	"sync"
)


func main() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("我只会出现一次")
	}
	done := make(chan bool)
	for i := 0; i < 3; i++ {
		go func() {
			once.Do(onceBody)
			done <- true
		}()
	}
	for i := 0; i < 3; i++ {
		<-done
	}
}
相关文章
相关标签/搜索