欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

GoLang 学习笔记(四)-- 并发基础(goroutine,信道,sync.Mutex,sync.WaitGroup)

程序员文章站 2022-03-07 15:13:12
今天把 A Tour of Go 的最后章节看完。 1. goroutine(Go 程) goroutine 是 Go 管理的一个轻量级线程。 go f(x, y, z) goroutine 会先对 f,x,y,z 求值,再把 f(x, y, z) 放到一个新的 goroutine 中。 注意:如果 ......

今天把 a tour of go 的最后章节看完。

1. goroutine(go 程)

goroutine 是 go 管理的一个轻量级线程。

go f(x, y, z)

goroutine 会先对 f,x,y,z 求值,再把 f(x, y, z) 放到一个新的 goroutine 中。

注意:如果主线程结束,所有的 goroutine 也会结束,所以不要把 goroutine 放在最后一行,你会发现 goroutine 就像不存在一样根本不执行。

2. 信道 channels

goroutine 在同一个地址空间中运行,因此如果两个 goroutine 要用到同一个内存的时候,就需要同步。sync 包也可以干这个,但没必要,go 有自己的信道。

ch = make(chan int) // 注意 chan 是个关键字不是变量名,"chan int" 合起来是一个储存 int 的信道类型

看语法大概也能直到,信道就是带类型的管道,用法如下:

ch <- va // "<-" 作为符号
vb := <-ch // "<-" 与 "ch" 连在一起组成 "<-ch",中间无空格,"<-ch" 是一个值
  1. 第一条语句是把 va 的值储存在 ch 中。
  2. 第二条语句是从 ch 中把刚储存的值取出来("<-ch" 是个值,因为很重要所以说两遍),然后赋值给 vb。

2.1 同步操作

信道之所能够进行同步,是因为信道的特性,发送和接收(就是上面两条语句)在另一端完成之前会被阻塞。比如说:

func sum(x, y, z int, ch chan int) {
    sum := x + y + z
    ch <- sum
}
func main() {
    ch := make(chan int)
    go sum(x1, y1, z1, ch)
    go sum(x2, y2, z2, ch)
    var x, y = <-ch, <-ch
    // do sth with x, y
    // ......
}

假设有这么两个 goroutine 使用了同一个信道:

  1. 主代码运行的时候,遇到第一个 goroutine,计算 x1, y1, z1, ch, sum 函数的值,挂起一个新的 goroutine 执行 sum。
  2. 遇到第二个 goroutine,计算 x2, y2, z2, ch, sum 函数的值,挂起一个新的 goroutine 执行 sum。
  3. 就在挂起第二个 goroutine 的时候,没想到第一个 sum 快如神速,已经计算完了,执行到了 ch <- sum,但是很可惜,主线程甚至还没执行到 var x, y = <-ch, <-ch 这一行,ch 没有在接收数据,于是第一个 sum 只好阻塞,等待。
  4. 主线程终于创建好了第二个 goroutine,于是从 ch 里取值并复制到 x, y ,于是开始计算第一个 <-ch,发现这不是刚好有一个 goroutine 在等我接收吗?我直接接收数据,我直接取值,我直接赋值给 x。
  5. 然后主线程到了第二个 <-ch,准备再次接收数据,然而很可惜,第二个 sum 慢如龟速,还在执行呢。没办法,主线程只好搁置,等待有 goroutine 往 ch 里发送数据。
  6. 终于第二个 sum 函数执行到了 ch <- sum这一行,有值被发送到了 ch,挂起的接受端立刻使用 <-ch 从 ch 中取值,赋值给 x。
  7. 终于 x, y 都被赋值完成,可以进行下一步的操作了。

阻塞保证了发送数据和接收数据的同步。

2.2 缓冲信道

信道每次接收数据都阻塞,但有时候我们就是想连续传多次数据,就是对完全同步没有那么大的要求,不想等第一个接收完了再传,而是可以一次接收多个数据,再传给接受者,怎么办呢?

  • 这里就可以用到缓冲信道,在创建信道的时候声明缓冲大小 make(chan int, 100) 赋予 100 个 int 大小的缓冲内存,就是说可以储存 100 个 int。

这样下面的代码就可以正常运行了:

var ch = make(chan int, 2)
ch <- 1
ch <- 2
x := <-ch
y := <-ch
fmt.println(x, y)

如果不是缓冲信道的话,运行到 ch <- 2 这一步就完蛋了,因为 ch 在 ch <- 1 的时候就已经阻塞了。而接收在后面,因此会造成 deadlock。

但是上面的代码是可以运行的,因为有 buffer 可以存两个 int,并且要在往里存第三个 int 的时候,才会阻塞。

所以其实非缓冲信道相当于 make(chan int , 0)(不是 make(chan int , 1) 哦, 1 的话是可以存一个数据的且只有在第二次存值的时候才会报错,也就是说可以在同一个 goroutine 上先存一个值再使用,虽然这没有意义(笑)

2.3 close 和 range

2.3.1 close 关闭发送端

顾名思义,close 就是关闭信道,实际上是关闭发送端。可以在接收的时候使用双赋值来得到信道的关闭信息。

ch = make(chan int, 1)

ch <- 1
v1, ok1 := <-ch
fmt.println(v1, ok1) // "1, true"

close(ch)
v2, ok2 := <-ch
fmt.println(v2, ok2) // "0, false"

为什么说关闭的是信号的发送端呢?因为如下代码是可以运行的:

ch := make(chan int, 1)

ch <- 1
close(ch) // 在 close 之前,已经往 buffered ch 里存了一个 int
v1, ok1 := <-ch
fmt.println(v1, ok1) // "1, true",buffer 里有的,依然可以收到

v2, ok2 := <-ch
fmt.println(v2, ok2) // "0, false",即使 buffer 是空的,依然不会报错,只是会得到零值
  1. 因为关闭的是发送端,所以只有发送者能够关闭信道(规定,不是语法,但你违反这个规定一般也会报错就是了),如果是接收者关闭信道了,发送者再发送一个数据到信道会触发 panic。

  2. 但是发送者关闭信道后,接收者还是能不报错的接收数据的
    ** (注意:发送端和接收端是相对于一个信道而言的,不是 goroutine 而言的,一个 goroutine 可以既有发送的信道,也有接收的信道) **

2.3.2 range 持续接收数据

对信道使用 range,会不断从信道中接收数据,直到信道关闭。

func senddata(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
        if i == 5 {
            close(ch)
            break
        }
    }
}

func main() {
    ch := make(chan int, 10)
    go senddata(ch)
    for i := range ch {
        fmt.println(i)
    }
    fmt.println("end")
}
// 输出:
// 1
// 2
// 3
// 4
// 5
// end

2.4 select

注意不要和 switch 搞混,因为两者长得很像,而且都用到 case 关键词,语法结构也像,但是 select 是用于并发的。

  1. 有时候,我们会有多个信道等待另一方发送或接收数据,而我们想等其中任意一个信道准备好了就先吧那个信道的事情处理好,这时候就需要 select 语句来帮忙了。
  2. 和 switch 一样,select 也有 default 语句,会在所有信道都没准备好的时候运行。如果不想干巴巴的什么都不输出就在那堵塞的时候可以用。
    select 的一个常见用法是在循环中使用:
for {
    select {
        case <-ch1:
            // do sth with ch1
        case <-ch2:
            // do sth with ch2
        default:
           // do sth
           // i.e. fmt.println("waiting...")
           //      time.sleep(100 * time.millisecond)
    }
}

由于有 default,select 永远都不会堵塞,在其他 case 没有准备好的时候,会一直调用 default,通过 defualt 里的 time.sleep 控制调用频率。

3. sync.mutex

如果看完以上,你应该直到信道的作用是在各个进程之间通信。但是如果我们并没有想通信,而只是想有一个共享变量,并限制 goroutine 一次只能有一个 goroutine 能访问这个变量。
这个概念叫做 mutual exclusion,简称 mutex(互斥锁)。

正好我们有一个包 sync 包,提供了 sync.mutex 类型,该类型有两个方法:lock()unlock()
这两个方法的用法顾名思义,在一段代码前调用 lock() 方法,在代码运行完毕后调用 unlock() 方法。就可以保证同时只能有一个 goroutine 在访问这段代码。可以使用 defer 关键字把 unlock() 的调用放在整段函数调用完毕时。
如果看文字没法理解的话,请参阅。

4. sync.waitgroup

经常遇到这样的情况:你递归地运行一个函数,每次调用都是一个 goroutine,然后这个调用是主线程的最后一行。于是你发现代码根本不跑,因为主线程一结束就没了。为了解决这个办法:

  • 你可以用 time.sleep,但是不优雅;
  • 你可以用 信道,但是要知道有多少个 goroutine,但是递归你怎么可能知道有多少个呢?而且也不优雅。
    那么就出现了我们的 sync.waitgroup,非常有用。用法也很简单:
  1. wg.add(n),给 counter 加上 n 个计数。

  2. wg.done() 给 counter 减少一个计数,一般是在 goroutine 函数中使用 defer 优雅处理。

  3. wg.wait(),在主线程中调用,在 counter 变为 0 之前都会阻塞。
    坑也很多:

  4. 使用 var wg sync.waitgroup 后 wg 不是指针,传入函数参数的时候要传入 &wg。但是传入后在递归的再次调用该函数的时候,由于已经是 *sync.waitgroup 类型,就不能再加 & 了。

    func recursion(depth int, wg *sync.waitgroup) {
        if depth == 0 {
            return
        }
        for i := 0; i < 10; i++ {
            go recursion(depth - 1, wg) // 不能加 &
        }
    }
    func main() {
        depth := 10
        var wg sync.waitgroup
        go recursion(depth, &wg) // 记得加 &
    }
    
  5. wg.add()wg.done() 的用法:拿上面的代码做例子,你是不是以为应该这么写:

    func recursion(depth int, wg *sync.waitgroup) {
        
        wg.add(1) // 每次调用都是一个新的 goroutine,所以加一个 counter
        defer wg.done() // 调用完毕就减少一个 counter
        
        if depth == 0 {
            return
        }
        for i := 0; i < 10; i++ {
            go recursion(depth - 1, wg)
        }
    }
    func main() {
        depth := 10
        var wg sync.waitgroup
        go recursion(depth, &wg)
        wg.wait()
    }
    
    • 当然大错特错,错炸了。你发现运行错误,你发现很多博客的用法都是在主线程中使用 wait.add(),你以为只能在主线程中使用,然后你就纳闷了,这样有什么意义,那还怎么知道递归了多少次?当然,wait.add() 肯定是要在 goroutine 中用的,全是网上搜的博客误人子弟(我就被他们坑了好几个小时,真的坑)。
  6. 你的错误在于对这里 goroutine 的数量的判断错误,来看看正确答案,其实只改了三行:

    func recursion(depth int, wg *sync.waitgroup) {
        
        
        defer wg.done() // 调用完毕就减少一个 counter
        
        if depth == 0 {
            return
        }
        for i := 0; i < 10; i++ {
            wg.add(1) // go 关键字在这里,goroutine 是在这里生成的,当然要在这里加
            go recursion(depth - 1, wg)
        }
    }
    func main() {
        depth := 10
        var wg sync.waitgroup
        wg.add(1) // 每次看到 go 关键字之前,就应该调用 wg.add()
        go recursion(depth, &wg)
        wg.wait()
    }
    
    • goroutine 是怎么产生的,是通过 go 关键字产生的,那么问题就好解决了,刚刚那段代码的问题在于,每次调用都执行了 10 次 go,产生了 10 个 goroutine,但是却只加了一个 counter。以及主线程其实也有一个 goroutine,但是你却没有把这个 counter 加上去。
    • 这里还挺难的,因为不同递归函数有不同的产生 goroutine 的方式,每次都要去理清递归了几次,在哪里加,说实话,挺难的。
    • 但是我总结了一个方法,那就是一旦看到 go 关键字,那么在这之前就需要调用 wg.add()。希望这方法能够帮助你更快的理清递归中的并发数量
  7. 试试更改上面的代码,但是代码依旧正常运行:

    func recursion(depth int, wg *sync.waitgroup) {
        
        
        defer wg.done()
        
        if depth == 0 {
            return
        }
        
        wg.add(10) // 我提前知道这里有 10 个 goroutine,就不在循环里加了
        for i := 0; i < 10; i++ {
            go recursion(depth - 1, wg)
        }
    }
    func main() {
        depth := 10
        var wg sync.waitgroup
        // wg.add(1) ,这一次这里没有 go 关键字,没有 goroutine 产生,所以不用加了
        recursion(depth, &wg) // 这一次,第一次调用我故意不加 go 关键字,所以这里不用调用 `wg.add(1)`
        wg.wait()
    }