channel补充

网易:mysql

package main

import (
    "fmt"
)

func main() {
    var c chan int
    fmt.Printf("c=%v\n", c)

    c = make(chan int, 1)
    fmt.Printf("c=%v\n", c)
    c <- 100

    /*
        data := <-c
        fmt.Printf("data:%v\n", data)
    */
    <-c
}

 

nobufChan 不带缓冲(不带大小的chan 没法插入数据的,只有当有人在获取数据时候才能够放入数据)git

好比:收快递:只有快递员见到你本人后,只能寄快递github

package main

import (
    "fmt"
    "time"
)

func produce(c chan int) {
    c <- 1000
    fmt.Println("produce finished")
}

func consume(c chan int) {
    data := <-c
    fmt.Println(data)
}

func main() {
    var c chan int
    fmt.Printf("c=%v\n", c)

    c = make(chan int)
    go produce(c)
    go consume(c)
    time.Sleep(time.Second * 5)
}

 

goroutine_sync 模拟sleep阻塞的功能sql

package main

import (
    "fmt"
    "time"
)

func hello(c chan bool) {
    time.Sleep(5 * time.Second)
    fmt.Println("hello goroutine")

    c <- true
}

func main() {
    var exitChan chan bool
    exitChan = make(chan bool)
    go hello(exitChan)
    fmt.Println("main thread terminate")
    <-exitChan
}

 

 只读 只写的chanapp

package main

import "fmt"

func sendData(sendch chan<- int) {
    sendch <- 10
    //<-sendch
}

func readData(sendch <-chan int) {
    //sendch <- 10
    data := <-sendch
    fmt.Println(data)
}

func main() {
    chnl := make(chan int)
    go sendData(chnl)
    readData(chnl)
}

 

判断管道是否关闭ide

package main

import (
    "fmt"
)

func producer(chnl chan int) {
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}

func main() {
    ch := make(chan int)
    go producer(ch)
    for {
        v, ok := <-ch
        if ok == false {
            fmt.Println("chan is closed")
            break
        }
        fmt.Println("Received ", v)
    }
}

 

for-range-chan  不须要关注管道是否关闭 管道关闭后 自动退出循环atom

package main

import (
	"fmt"
	"time"
)

func producer(chnl chan int) {
	for i := 0; i < 10; i++ {
		chnl <- i
		time.Sleep(time.Second)
	}
	close(chnl)
}

func main() {
	ch := make(chan int)
	go producer(ch)
	for v := range ch {
		fmt.Println("receive:", v)
	}
}

  

 待缓冲的chan(容量)url

特色:当没有往chan放入数据,直接去获取数据就会报错(死锁);当超过chan容量后,继续放入数据也会报错(死锁)spa

 

package main

import "fmt"

func main() {
    ch := make(chan string, 2)
    var s string
    //s = <-ch
    ch <- "hello"
    ch <- "world"
    ch <- "!"
    //ch <- "test"
    s1 := <-ch
    s2 := <-ch

    fmt.Println(s, s1, s2)
}
View Code

待缓冲的chan3d

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v, "from ch")
        time.Sleep(2 * time.Second)
    }
}
View Code

 

长度和容量

    
package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
View Code

 

如何等待一组goroutine结束?

方法1:low版本

package main

import (
    "fmt"
    "time"
)

func process(i int, ch chan bool) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    ch <- true
}
func main() {
    no := 3
    exitChan := make(chan bool, no)
    for i := 0; i < no; i++ {
        go process(i, exitChan)
    }
    for i := 0; i < no; i++ {
        <-exitChan
    }
    fmt.Println("All go routines finished executing")
}

 

方法2:sync.WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}
func main() {
    no := 3
    var wg sync.WaitGroup
    wg.Wait()
    fmt.Println("wait return")
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

 

workerpool的实现

woker池的实现

a,生产者,消费者模型,简单有效

b,控制goroutine的数量,防止goroutine泄露和暴涨

c,基于goroutine和chan,构建wokerpool很是简单

 

 1,任务抽象程一个个job

2,使用job队列和result队列

3,开一个组goroutine进行实际任务计算,并把结果放回result队列

 案例:

package main

import (
    "fmt"
    "math/rand"
)

type Job struct {
    Number int
    Id     int
}

type Result struct {
    job *Job
    sum int
}

func calc(job *Job, result chan *Result) {
    var sum int
    number := job.Number
    for number != 0 {
        tmp := number % 10
        sum += tmp
        number /= 10
    }

    r := &Result{
        job: job,
        sum: sum,
    }

    result <- r
}

func Worker(jobChan chan *Job, resultChan chan *Result) {

    for job := range jobChan {
        calc(job, resultChan)
    }
}

func startWorkerPool(num int, jobChan chan *Job, resultChan chan *Result) {

    for i := 0; i < num; i++ {
        go Worker(jobChan, resultChan)
    }
}

func printResult(resultChan chan *Result) {
    for result := range resultChan {
        fmt.Printf("job id:%v number:%v result:%d\n", result.job.Id, result.job.Number, result.sum)
    }
}

func main() {

    jobChan := make(chan *Job, 1000)
    resultChan := make(chan *Result, 1000)

    startWorkerPool(128, jobChan, resultChan)

    go printResult(resultChan)
    var id int
    for {
        id++
        number := rand.Int()
        job := &Job{
            Id:     id,
            Number: number,
        }

        jobChan <- job
    }
}
View Code

 

 select 

 

 

 

package main

import (
    "fmt"
    "time"
)

func server1(ch chan string) {
    time.Sleep(time.Second * 6)
    ch <- "response from server1"
}

func server2(ch chan string) {
    time.Sleep(time.Second * 3)
    ch <- "response from server2"
}

func main() {
    output1 := make(chan string)
    output2 := make(chan string)

    go server1(output1)
    go server2(output2)
    /*
        s1 := <-output1
        fmt.Println("s1:", s1)
        s2 := <-output2
        fmt.Println("s2:", s2)
    */

    select {
    case s1 := <-output1:
        fmt.Println("s1:", s1)
    case s2 := <-output2:
        fmt.Println("s2:", s2)
    default:
        fmt.Println("run default")
    }
}
View Code

 

package main

import (
    "fmt"
    "time"
)

func write(ch chan string) {
    for {
        select {
        case ch <- "hello":
            fmt.Println("write succ")
        default:
            fmt.Println("channel is full")
        }
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {
    //select {}

    output1 := make(chan string, 10)

    go write(output1)
    for s := range output1 {
        fmt.Println("recv:", s)
        time.Sleep(time.Second)
    }
}
View Code

 

 

 

 

 

 

 

 

 

sync.Mutex

package main

import (
    "fmt"
    "sync"
)

var x int
var wg sync.WaitGroup
var mutex sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        mutex.Lock()
        x = x + 1
        mutex.Unlock()
    }
    wg.Done()
}

func main() {

    wg.Add(2)
    go add()
    go add()

    wg.Wait()
    fmt.Println("x:", x)
}
View Code

 

 

 

 

package main

import (
    "fmt"
    "sync"
    "time"
)

var rwlock sync.RWMutex
var x int
var wg sync.WaitGroup

func write() {
    rwlock.Lock()
    fmt.Println("write lock")
    x = x + 1
    time.Sleep(10 * time.Second)
    fmt.Println("write unlock")
    rwlock.Unlock()
    wg.Done()
}

func read(i int) {
    fmt.Println("wait for rlock")
    rwlock.RLock()
    fmt.Printf("goroutine:%d x=%d\n", i, x)
    time.Sleep(time.Second)
    rwlock.RUnlock()
    wg.Done()
}

func main() {

    wg.Add(1)
    go write()
    time.Sleep(time.Millisecond * 5)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go read(i)
    }

    wg.Wait()

}
读锁写锁

 

互斥锁和读写锁比较

package main

import (
    "fmt"
    "sync"
    "time"
)

var rwlock sync.RWMutex
var x int
var wg sync.WaitGroup
var mutex sync.Mutex

func write() {
    for i := 0; i < 100; i++ {
        //rwlock.Lock()
        mutex.Lock()
        x = x + 1
        time.Sleep(10 * time.Millisecond)
        mutex.Unlock()
        //rwlock.Unlock()
    }
    wg.Done()
}

func read(i int) {
    for i := 0; i < 100; i++ {
        //rwlock.RLock()
        mutex.Lock()
        time.Sleep(time.Millisecond)
        mutex.Unlock()
        //rwlock.RUnlock()
    }
    wg.Done()
}

func main() {

    start := time.Now().UnixNano()
    wg.Add(1)
    go write()

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go read(i)
    }

    wg.Wait()
    end := time.Now().UnixNano()
    cost := (end - start) / 1000 / 1000
    fmt.Println("cost:", cost, "ms")
}
View Code

 

 

 

 

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var x int32
var wg sync.WaitGroup

var mutex sync.Mutex

func addMutex() {
    for i := 0; i < 500; i++ {
        mutex.Lock()
        x = x + 1
        mutex.Unlock()
    }
    wg.Done()
}

func add() {
    for i := 0; i < 500; i++ {
        //mutex.Lock()
        //x = x +1
        atomic.AddInt32(&x, 1)
        //mutex.Unlock()
    }
    wg.Done()
}

func main() {

    start := time.Now().UnixNano()
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go add()
        //go addMutex()
    }

    wg.Wait()
    end := time.Now().UnixNano()
    cost := (end - start) / 1000 / 1000
    fmt.Println("x:", x, "cost:", cost, "ms")
}
atomic

 

 

其它案例:

先看代码

package main
import (
    "strings"
    "fmt"
    "time"
)



func main()  {

    users:=strings.Split("shenyi,zhangsan,lisi,wangwu",",")
    ages:=strings.Split("19,21,25,26",",")

    c1,c2:=make(chan bool),make(chan bool)
    ret:=make([]string,0)
    go func() {
        for _,v:=range users{
             <-c1
             ret=append(ret,v)
             time.Sleep(time.Second)
             c2<-true
        }
    }()
    go func() {
        for _,v:=range ages{
            <-c2
            ret=append(ret,v)
            c1<-true
        }
    }()
    c1<-true
    fmt.Println(ret)


}

打印:

[shenyi]

 

package main
import (
    //_ "github.com/go-sql-driver/mysql"
    "io/ioutil"
    "net/http"
    "fmt"
)



func main()  {

     url:="https://news.cnblogs.com/n/page/%d/"

     c:=make(chan map[int][]byte)
     for i:=1;i<=3;i++{
         go func(index int) {
            url:=fmt.Sprintf(url,index)
            res,_:=http.Get(url)
            cnt,_:= ioutil.ReadAll(res.Body)
            c<-map[int][]byte{index:cnt}

            if index==3 {
                close(c)
            }
        }(i)
     }

     for getcnt:=range c{
          for k,v:=range getcnt{
             ioutil.WriteFile(fmt.Sprintf("./files/%d",k),v,666)
         }

     }







}

打印:

。。。。会一直hang住