go语言 7 并发编程
文章由作者马志国在博客园的原创,若转载请于明显处标记出处:http://www.cnblogs.com/mazg/
对应的视频教程地址(腾讯课堂):https://mazhiguo.ke.qq.com/
对应的视频教程地址(网易云课堂):http://study.163.com/course/introduction/1004982006.htm
对应的视频教程地址(51cto):http://edu.51cto.com/course/12520.html
今天我们学习Go语言编程的第七章,并发编程。语言级别的支持并发编程是Go语言最大的优势和特色,所以这章是Go语言学习的重点和难点,当然内容也比较多。首先我们会介绍并发编程的相关概念,其次介绍Go语言中轻量级的线程,goroutine。最后学习goroutine之间的两种通信机制,一种是消息通信机制,channel。另外一种是共享内存的方式。
7.1 并发编程的相关概念
7.1.1 进程和线程
在现代操作系统中,线程是CPU调度和分配的基本单位,进程则作为资源拥有的基本单位。每个进程是由私有的虚拟地址空间、代码、数据和其它各种系统资源组成,进程与进程之间是独立。线程是进程内部的一个执行单元。 每一个进程至少有一个主执行线程,这个主线程无需由用户去主动创建,是由系统自动创建的。 用户根据需要在应用程序中创建其它线程,多个线程并发地运行于同一个进程中,同一进程的不同线程可以共享进程内的资源。对于编程来讲,我们通常需要解决的问题是进程间的通信和线程间的同步。
7.1.2 并行与并发
并发与并行(Concurrency and Parallelism)是两个不同的概念,理解它们对于理解多线程模型非常重要。并发是指在一个时间段内有多个线程或进程在执行,但在某个时间点上只有一个在执行,多个线程或进程通过争抢CPU时间片轮流执行。并行是指一个任意时间点上都有多个线程或进程在执行。并发就像一个家长(cpu)在喂多个孩子(线程),轮换着每个孩子喂一口,表面上多个孩子都在吃饭。并行就像n个家长(cpu)在喂n个孩子(线程),这n个孩子同时都在吃饭。并行需要硬件支持,单核处理器只能是并发,多核处理器才能做到并行。
7.1.3 多线程与多核CPU
多核处理器是指在一个CPU处理器上集成多个运算核心从而提高计算能力,也就是有多个真正并行计算的处理核心,一般一个处理核心对应一个内核线程。例如,单核处理器对应一个内核线程,双核处理器对应两个内核线程,四核处理器对应四个内核线程。现在的电脑一般是双核四线程、四核八线程,是采用超线程技术将一个物理处理核心模拟成两个逻辑处理核心,对应两个内核线程,所以在操作系统中看到的CPU数量是实际物理CPU数量的两倍。
程序一般不会直接去使用内核线程,而是使用用户线程。用户线程与内核线程的对应关系有三种模型:一对一模型、多对一模型、多对多模型,在这以4个内核线程、3个用户线程为例对三种模型进行说明。
1 一对一模型(1:1)
对于一对一模型来说,一个用户线程就唯一地对应一个内核线程(反过来不一定成立,一个内核线程不一定有对应的用户线程)。这样,如果CPU没有采用超线程技术(如四核四线程的计算机),一个用户线程就唯一地映射到一个物理CPU的线程,线程之间是并行处理的。而且一个线程因某种原因阻塞时,其他线程的执行不受影响。缺点是操作系统内核线程调度时,上下文切换的开销较大,导致用户线程的执行效率下降。
2 多对一模型(M:1)
多对一模型将多个用户线程映射到一个内核线程上,线程之间的切换由用户态的代码来进行,因此相对一对一模型,多对一模型的线程切换速度要快许多;此外,多对一模型对用户线程的数量几乎无限制。但多对一模型也有两个缺点:1.如果其中一个用户线程阻塞,那么其它所有线程都将无法执行,因为此时内核线程也随之阻塞了;2.在多处理器系统上,处理器数量的增加对多对一模型的线程性能不会有明显的增加,因为所有的用户线程都映射到一个处理器上了。
3 多对多模型(M:N)
多对多模型结合了一对一模型和多对一模型的优点,将多个用户线程映射到多个内核线程上。多对多模型的优点有:1.一个用户线程的阻塞不会导致所有线程的阻塞,因为此时还有别的内核线程可以被调度来执行;2.多对多模型对用户线程的数量没有限制;3.在多处理器的操作系统中,多对多模型的线程也能得到一定的性能提升。
7.2 goroutine
goroutine是Go语言中的轻量级线程实现,实现了M : N的线程模型,由Go运行时(runtime)管理。与传统的系统级线程和进程相比,其最大优势在于其“轻量级”,因为goroutine使用的是动态栈,可以小到几k。所以,在一台服务器上可以轻松创建上百万个goroutine而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。
当一个程序启动时,其主函数在一个单独的goroutine中运行,我们叫它main gorouine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速的完成。
package main
import ( "fmt" ) func main() { go fmt.Println("Hello") fmt.Println("World ") } |
上述的代码中,go fmt.Println("Hello")这条语句,会使得fmt.Println("Hello")函数在一个新创建的goroutine中运行,go语句迅速完成。然后执行fmt.Println("World ")。输出结果会是以下三种情况:
1>World
新的gorouine没有来的及执行,main gorouine 输出”World”后程序就直接退出了。
2>Word
Hello
main gorouine 输出”World”,新的gorouine输出”Hello”后,程序才退出。
3>Hello
world
新的gorouine先输出了”Hello”,接着main gorouine 输出”World”后程序退出。
从以上案例得出的结论是,多个gorouine并发执行时的先后顺序是不确定的。如果希望确定的先输出Hello再输出world,我们可以这样修改代码:
package main
import ( "fmt" "runtime" )
func main() { go fmt.Println("Hello") runtime.Gosched() //出让时间片 fmt.Println("World ")
} |
或者
package main
import ( "fmt" "time" )
func main() { go fmt.Println("Hello") time.Sleep(1) //主goroutine延时1毫秒,也会出让时间片 fmt.Println("World ")
} |
通过让主goroutine出让时间片,可以使得新创建的goroutine先执行就可以达到目的。但是在工程中要解决的并发问题远不会这么简单。我们不但需要确定多个goroutine之间的执行顺序,还要能够在多个goroutine之间完成通信。两种最常见的并发通信模型模式是:消息通信和共享数据。
7.3 channel
channel是goroutine之间的消息通信机制。channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要定义channel时指定。
使用内置的make函数,我们可以创建一个channel:
chi := make(chan int) chs := make(chan string) chf := make(chan interface{}) |
和map类似,channel也是一个对应make创建的底层数据结构的引用。当我们复制一个channel或用于函数的参数传递时,我们只是拷贝了一个channel引用。channel的零值也是nil。
两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相通的对象,那么比较的结果为真。一个channel也可以和nil进行比较。
一个channel有发送和接受两个主要操作,都是使用<-运算符。一个不保存接收结果的接收操作也是合法的。
ch<-x //发送 x=<-ch //接收 <-ch //接收操作,但不保存接收结果 |
channel还支持close操作,用于关闭channel,随后对该channel的任何发送操作都将导致panic异常。对一个已经关闭的channel接收数据,依然可以接收到已经成功发送的数据,如果没有的话,将接收到零值的数据。使用内置的close函数可以关闭channel。
close(ch) |
如果channel容量大于零,就是带缓存的channel。
ch = make(chan int) ch = make(chan int,0) ch = make(chan int,5)//带缓存的channel |
7.3.1 无缓存的channel
一个基于无缓存的channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作。同样,如果接收操作先发生,那么接收者gotoutine也将阻塞,直到有另一个goroutine在相同的channel上执行发送操作。基于无缓存channel的发送和接收操作将导致两个goroutine做一次同步操作,因此,无缓存的channel也称为同步channel。当通过一个无缓存channel发送数据时,接收者收到数据发生在唤醒发送这goroutine之前(happens before)。
func Sum(arr []int, ch chan int) { sum := 0 for _, v := range arr { sum += v } ch <- sum }
func main() {
arr := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} c := make(chan int) go Sum(arr[:len(arr)/2], c) go Sum(arr[len(arr)/2:], c) x, y := <-c, <-c fmt.Println(x, y, x+y)
} |
7.3.2 串联的channel(Pipeline)
channel可以将多个goroutine链接在一起,一个channel的输出作为下一个channel的输入。类似与进程间通信的管道。下面使用两个channel串联三个goroutine。
第一个gorutine用于生成0、1、2、3......整数序列,通过channel传递给第二个goroutine;第二个goroutine将收到的整数求平方,然后将结果通过第二个channel传递给第三个goroutine,第三个goroutine打印收到的每个结果。
package main
import ( "fmt" "time" )
func main() { num := make(chan int) sqr := make(chan int) //Counter go func() { for x := 0; ; x++ { num <- x time.Sleep(1 * time.Second) //增加这句话,才方便看到运行效果 } }() // Squarer go func() { for { x := <-num sqr <- x * x } }() //Printer for { fmt.Println(<-sqr) } } |
如果我们希望通过channels只发送有限的数列如何处理?如果没有更多的值需要发送时,可以通过内置的close函数关闭channel。当一个channel被关闭后,再向该channel发送数据将导致panic异常。所以一般在数据发送方确定不在发送数据时,关闭channel。
当一个已经关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。上面的num channel并不能终止循环,它依然会受到一个永无休止的零值序列,然后将它们发送给打印者goroutine。
没有办法测试一个channel是否被关闭。但是接收操作有一个变体形式:多接收一个结果,多接收的第二个结果是一个布尔值ok,true表示成功从channels接收到值,false表示channel已经被关闭,并且里面没有值可以接收。我们可以这样修改接收数据并计算的goroutine。
go func(){ for{ x,ok := <-num if !ok{ break } sqr <- x * x } close(sqr ) } |
上面的语法比较繁琐,Go语言的range循环可以直接在channel上迭代。最终的例子:
package main
import "fmt"
func main() { num := make(chan int) sqr := make(chan int)
// Counter go func() { for x := 0; x < 100; x++ { num <- x } close(num) }()
// Squarer go func() { for x := range num { sqr <- x * x } close(sqr) }()
// Printer (在主goroutine中) for x := range sqr { fmt.Println(x) } } |
不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。
7.3.3 单向的Channel
随着代码量的增长,通常需要把代码按功能拆分成一个个相对独立的函数。每个函数在一个单独的goroutine中执行,使用channel作为参数进行通信。在函数内部,有的channel只接收数据,有的channel只发送数据,这时可以使用单向的channel来表达这种意图。
chan<-int表示只发送不接收;相反,类型<-chan int只接收不发送。箭头<-和关键字chan的相对位置表明了channel的方向。这种限制将在编译期检测。
将三个goroutine拆分为以下三个函数:
func counter(out chan<- int) func squarer(out chan<- int,in <-chan int) func printer(in <-chan int) |
因为关闭操作用于断言不再向channel发送新的数据,所有只有在发送者所在的goroutine才会调用close函数,因此对一个只接受的channel调用close函数将是一个编译错误。
package main
import "fmt"
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) }
func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) }
func printer(in <-chan int) { for v := range in { fmt.Println(v) } }
func main() { num := make(chan int) sqr:= make(chan int)
go counter(num ) go squarer(sqr, num ) printer(sqr) } |
任何双向的channel向单向的channel赋值都将导致隐式转换。但是不能反向转换。
7.3.4 带缓存的channel
带缓存的channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面语句创建了一个可以持有3个字符串元素的带缓存的channel。
ch = make(chan string,3) |
向缓存channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,发送操作将阻塞。如果channel是空的,接受操作将阻塞。通过缓存队列解耦了接收和发送的goroutine。
cap函数可以获取channel内部缓存的容量。len函数可以获取channel内部缓存队列中有效元素的个数。多个goroutine并发的向同一个channel发送数据或从同一个channel接收数据都是常见的用法。
如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收数据而永远阻塞。这种情况,称为goroutines泄露,这将是一个bug。和垃圾变量不同,泄露的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出时总要的。
关于无缓存或带缓存channels之间的选择,或者带缓存channels的容量大小的选择,都可能影响程序的正确性。无缓存channel更强的保证了每个发送操作与接收操作的同步。但是对于带缓存channel是解耦的。
channel的缓存也可能影响程序的性能。
package cake
import ( "fmt" "math/rand" "time" )
type Shop struct { Verbose bool Cakes int // number of cakes to bake BakeTime time.Duration // time to bake one cake BakeStdDev time.Duration // standard deviation of baking time BakeBuf int // buffer slots between baking and icing NumIcers int // number of cooks doing icing IceTime time.Duration // time to ice one cake IceStdDev time.Duration // standard deviation of icing time IceBuf int // buffer slots between icing and inscribing InscribeTime time.Duration // time to inscribe one cake InscribeStdDev time.Duration // standard deviation of inscribing time }
type cake int
func (s *Shop) baker(baked chan<- cake) { for i := 0; i < s.Cakes; i++ { c := cake(i) if s.Verbose { fmt.Println("baking", c) } work(s.BakeTime, s.BakeStdDev) baked <- c } close(baked) }
func (s *Shop) icer(iced chan<- cake, baked <-chan cake) { for c := range baked { if s.Verbose { fmt.Println("icing", c) } work(s.IceTime, s.IceStdDev) iced <- c } }
func (s *Shop) inscriber(iced <-chan cake) { for i := 0; i < s.Cakes; i++ { c := <-iced if s.Verbose { fmt.Println("inscribing", c) } work(s.InscribeTime, s.InscribeStdDev) if s.Verbose { fmt.Println("finished", c) } } }
// Work runs the simulation 'runs' times. func (s *Shop) Work(runs int) { for run := 0; run < runs; run++ { baked := make(chan cake, s.BakeBuf) iced := make(chan cake, s.IceBuf) go s.baker(baked) for i := 0; i < s.NumIcers; i++ { go s.icer(iced, baked) } s.inscriber(iced) } }
// work blocks the calling goroutine for a period of time // that is normally distributed around d // with a standard deviation of stddev. func work(d, stddev time.Duration) { delay := d + time.Duration(rand.NormFloat64()*float64(stddev)) time.Sleep(delay) } |
这个包模拟了一个蛋糕店,可以通过不同的参数的调整。它还提供了对上面提到的几种场景提供对应的基准测试(11.4)
package cake_test
import ( "testing" "time"
"gopl.io/ch8/cake" )
var defaults = cake.Shop{ Verbose: testing.Verbose(), Cakes: 20, BakeTime: 10 * time.Millisecond, NumIcers: 1, IceTime: 10 * time.Millisecond, InscribeTime: 10 * time.Millisecond, }
func Benchmark(b *testing.B) { // Baseline: one baker, one icer, one inscriber. // Each step takes exactly 10ms. No buffers. cakeshop := defaults cakeshop.Work(b.N) // 224 ms }
func BenchmarkBuffers(b *testing.B) { // Adding buffers has no effect.//增加缓存没有影响 cakeshop := defaults cakeshop.BakeBuf = 10 cakeshop.IceBuf = 10 cakeshop.Work(b.N) // 224 ms }
func BenchmarkVariable(b *testing.B) { // Adding variability to rate of each step // increases total time due to channel delays.//通道的延时增加了总时间 cakeshop := defaults cakeshop.BakeStdDev = cakeshop.BakeTime / 4 cakeshop.IceStdDev = cakeshop.IceTime / 4 cakeshop.InscribeStdDev = cakeshop.InscribeTime / 4 cakeshop.Work(b.N) // 259 ms }
func BenchmarkVariableBuffers(b *testing.B) { // Adding channel buffers reduces // delays resulting from variability. //使用带缓冲区的通道减少延时 cakeshop := defaults cakeshop.BakeStdDev = cakeshop.BakeTime / 4 cakeshop.IceStdDev = cakeshop.IceTime / 4 cakeshop.InscribeStdDev = cakeshop.InscribeTime / 4 cakeshop.BakeBuf = 10 cakeshop.IceBuf = 10 cakeshop.Work(b.N) // 244 ms }
func BenchmarkSlowIcing(b *testing.B) { // Making the middle stage slower//增加中间环节的时间 // adds directly to the critical path. cakeshop := defaults cakeshop.IceTime = 50 * time.Millisecond cakeshop.Work(b.N) // 1.032 s }
func BenchmarkSlowIcingManyIcers(b *testing.B) { // Adding more icing cooks reduces the cost of icing // // to its sequential component, following Amdahl's Law. cakeshop := defaults cakeshop.IceTime = 50 * time.Millisecond cakeshop.NumIcers = 5 cakeshop.Work(b.N) // 288ms } |
自己的理解:缓存不是越大越好,还要看发送和接收数据的goutine的速度,可以通过设置goroutine的数量调整发送和接收数据的速度。
7.3.5 并发的循环
并发的循环指的是在循环体内,通过go+匿名函数生成多个goroutine,如果在goroutine内用到外部外部函数的变量,不要直接使用,需要将外部变量作为匿名函数的参数传递,保证每个gotoutine运行不同的变量。匿名函数在一个新的goroutine中执行,每个goroutine执行时,i的值是不确定的。只有在调用函数时,通过参数传递才能确定函数内的值。
错误的演示:
func loopgo() { for i := 1; i < 10; i++ { go func() { fmt.Printf("第%d个 goroutine\n", i) }() } }
|
正确的演示:
func loopgo() { for i := 1; i < 10; i++ { go func(i int) { fmt.Printf("第%d个 goroutine\n", i) }(i) } } |
7.3.6基于select的多路复用
Go语言直接在语言级别支持select关键字,用于处理异步IO问题 。select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:
select { case <-chan1: // 如果chan1成功读到数据,则进行该case处理语句 case chan2 <- 1: // 如果成功向chan2写入数据,则进行该case处理语句 default: // 如果上面都没有成功,则进入default处理流程 } |
可以看出, select不像switch,后面并不带判断条件,而是直接去查看case语句。每个case语句都必须是一个面向channel的操作。比如上面的例子中,第一个case试图从chan1读取一个数据并直接忽略读到的数据,而第二个case则是试图向chan2中写入一个整型数1,如果这两者都没有成功,则到达default语句。
Go语言没有提供直接的超时处理机制,但我们可以利用select机制。虽然select机制不是专为超时而设计的,却能很方便地解决超时问题。因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。
基于此特性,我们来为channel实现超时机制:
// 首先,我们实现并执行一个匿名的超时等待函数 timeout := make(chan bool, 1) go func() { time.Sleep(1e9) // 等待1秒钟 timeout <- true }() // 然后我们把timeout这个channel利用起来 select { case <-ch: // 从ch中读取到数据 case <-timeout: // 一直没有从ch中读取到数据,但从timeout中读取到了数据 } |
这样使用select机制可以避免永久等待的问题,因为程序会在timeout中获取到一个数据后继续执行,无论对ch