6. 并发(Go Tutorial)
并发
Go 语言里的并发指的是能让某个函数独立于其他函数运行的能力。当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。
Go 语言运行时的调度器管理着所有的 goroutine 并为其分配执行时间。这个调度器在操作系统之上,将操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行 goroutine。
Go 语言的并发模型来自一个叫做通信顺序进程(CSP)。CSP 是一种消息传递模型,通过在 goroutine 之间传递数据,而不是,对数据进行加锁来实现同步。通道(channel)是用来在 goroutine 之间传递数据的数据模型
6.1 并发与并行
进程可以看作一个包含了应用程序运行需要用到和维护的各种资源的容器。线程是一个执行空间,用于被操作系统调度。一个进程至少包含一个主线程。
并行是让不同的代码片段同时在不同的物理处理器上运行
并发是指同时管理着很多事情,这些事情可能做到一半就被暂停去做别的事情了
一般,并发的效果比并行的效果要好,因为操作系统的硬件和资源有限,而要做的事情有很多。
在 Go 语言里,如果要让 goroutine 并行,必须使用多于一个逻辑处理器,否则将会是并发执行。
6.2 goroutine
代码实例
// This sample program demonstrates how to create goroutines and
// how the goroutine scheduler behaves with two logical processor.
package main
import (
"fmt"
"runtime"
"sync"
)
// main is the entry point for all Go programs.
func main() {
// Allocate two logical processors for the scheduler to use.
runtime.GOMAXPROCS(2)
// wg is used to wait for the program to finish.
// Add a count of two, one for each goroutine.
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start Goroutines")
// Declare an anonymous function and create a goroutine.
go func() {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
// Display the alphabet three times.
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// Declare an anonymous function and create a goroutine.
go func() {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
// Display the alphabet three times.
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// Wait for the goroutines to finish.
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("\nTerminating Program")
}
6.3 竞争状态
如果两个或多个 goroutine 在没有同步的情况下,访问某个共享的资源,并试图同时读写这个资源,就会处于相互竞争状态。
竞争状态的存在,让并发程序变得复杂,很容易出现问题。
竞争状态的 goroutine
// This sample program demonstrates how to create race
// conditions in our programs. We don't want to do this.
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter is a variable incremented by all goroutines.
counter int
// wg is used to wait for the program to finish.
wg sync.WaitGroup
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
fmt.Println("Final Counter:", counter)
}
// incCounter increments the package level counter variable.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Capture the value of Counter.
value := counter
// Yield the thread and be placed back in queue.
runtime.Gosched()
// Increment our local value of Counter.
value++
// Store the value back into Counter.
counter = value
}
}
// output: 2
使用 go build -race
可以在 build 时检测存在竞争状态的代码
6.4 锁住共享资源
Go 语言提供了传统的同步 goroutine 机制,那就是对共享资源加锁。
atomic 和 sync 包里的函数提供了很好的解决方案
6.4.1 原子函数
原子函数能够以很低层的加锁机制来同步访问整形变量和指针。
// This sample program demonstrates how to use the atomic
// package to provide safe access to numeric types.
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// counter is a variable incremented by all goroutines.
counter int64
// wg is used to wait for the program to finish.
wg sync.WaitGroup
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
// Display the final value.
fmt.Println("Final Counter:", counter)
}
// incCounter increments the package level counter variable.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Safely Add One To Counter.
atomic.AddInt64(&counter, 1)
// Yield the thread and be placed back in queue.
runtime.Gosched()
}
}
// This sample program demonstrates how to use the atomic
// package functions Store and Load to provide safe access
// to numeric types.
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
// shutdown is a flag to alert running goroutines to shutdown.
shutdown int64
// wg is used to wait for the program to finish.
wg sync.WaitGroup
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go doWork("A")
go doWork("B")
// Give the goroutines time to run.
time.Sleep(1 * time.Second)
// Safely flag it is time to shutdown.
fmt.Println("Shutdown Now")
atomic.StoreInt64(&shutdown, 1)
// Wait for the goroutines to finish.
wg.Wait()
}
// doWork simulates a goroutine performing work and
// checking the Shutdown flag to terminate early.
func doWork(name string) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
fmt.Printf("Doing %s Work\n", name)
time.Sleep(250 * time.Millisecond)
// Do we need to shutdown.
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down\n", name)
break
}
}
}
6.4.2 互斥锁
另一种同步访问的共享资源的方式就是使用互斥锁。互斥锁用户在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界区代码。
// This sample program demonstrates how to use a mutex
// to define critical sections of code that need synchronous
// access.
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter is a variable incremented by all goroutines.
counter int
// wg is used to wait for the program to finish.
wg sync.WaitGroup
// mutex is used to define a critical section of code.
mutex sync.Mutex
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}
// incCounter increments the package level Counter variable
// using the Mutex to synchronize and provide safe access.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Only allow one goroutine through this
// critical section at a time.
mutex.Lock()
{
// Capture the value of counter.
value := counter
// Yield the thread and be placed back in queue.
runtime.Gosched()
// Increment our local value of counter.
value++
// Store the value back into counter.
counter = value
}
mutex.Unlock()
// Release the lock and allow any
// waiting goroutine through.
}
}
6.5 通道
原子函数和互斥锁都能工作,但是依靠他们都不会让编写并发程序变得简单、更不易出错或者有趣。
在 Go 语言里,你不仅可以使用原子函数和互斥锁来保证对共享资源的安全访问和消除竞争资源,还可以使用通道,通过发送和接受需要共享的资源,在 goroutine 之间做同步。
当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。
- 使用 make 创建通道
// 无缓冲的整形通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
- 向通道发送值
buffered := make(chan string, 10)
buffered <- "hello Go!"
- 从通道接收值
value := <- buffered
6.5.1 无缓冲通道
无缓冲通道是指在接收前没有能力保存任何值的通道。
无缓冲类型通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果双方没有同时准备好,通道会先导致发送或接收操作的 goroutine 组塞等待。
在无缓冲通道里的这种发送和接收的交互行为本身就是同步的,其中任意一个操作都无法离开另一个操作单独存在。
网球比赛
// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
court := make(chan int)
// Add a count of two, one for each goroutine.
wg.Add(2)
// Launch two players.
go player("Nadal", court)
go player("Djokovic", court)
// Start the set.
court <- 1
// Wait for the game to finish.
wg.Wait()
}
// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
// Wait for the ball to be hit back to us.
ball, ok := <-court
if !ok {
// If the channel was closed we won.
fmt.Printf("Player %s Won\n", name)
return
}
// Pick a random number and see if we miss the ball.
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// Close the channel to signal we lost.
close(court)
return
}
// Display and then increment the hit count by one.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// Hit the ball back to the opposing player.
court <- ball
}
}
4 个 goroutine 之间的接力比赛
// This sample program demonstrates how to use an unbuffered
// channel to simulate a relay race between four goroutines.
package main
import (
"fmt"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
baton := make(chan int)
// Add a count of one for the last runner.
wg.Add(1)
// First runner to his mark.
go Runner(baton)
// Start the race.
baton <- 1
// Wait for the race to finish.
wg.Wait()
}
// Runner simulates a person running in the relay race.
func Runner(baton chan int) {
var newRunner int
// Wait to receive the baton.
runner := <-baton
// Start running around the track.
fmt.Printf("Runner %d Running With Baton\n", runner)
// New runner to the line.
if runner != 4 {
newRunner = runner + 1
fmt.Printf("Runner %d To The Line\n", newRunner)
go Runner(baton)
}
// Running around the track.
time.Sleep(100 * time.Millisecond)
// Is the race over.
if runner == 4 {
fmt.Printf("Runner %d Finished, Race Over\n", runner)
wg.Done()
return
}
// Exchange the baton for the next runner.
fmt.Printf("Runner %d Exchange With Runner %d\n",
runner,
newRunner)
baton <- newRunner
}
6.5.2 有缓冲通道
有缓冲通道是一种在接收前能够存储一个或多个值的通道,它并不强制要求 goroutine 之间必须同时完成发送和接收,这是与无缓冲通道最大的区别。
有缓冲通道也会阻塞,缓冲区已满,会在发送方阻塞,缓冲区为空,会在接收方阻塞。
// This sample program demonstrates how to use a buffered
// channel to work on multiple tasks with a predefined number
// of goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
numberGoroutines = 4 // Number of goroutines to use.
taskLoad = 10 // Amount of work to process.
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
// init is called to initialize the package by the
// Go runtime prior to any other code being executed.
func init() {
// Seed the random number generator.
rand.Seed(time.Now().Unix())
}
// main is the entry point for all Go programs.
func main() {
// Create a buffered channel to manage the task load.
tasks := make(chan string, taskLoad)
// Launch goroutines to handle the work.
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
go worker(tasks, gr)
}
// Add a bunch of work to get done.
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}
// Close the channel so the goroutines will quit
// when all the work is done.
close(tasks)
// Wait for all the work to get done.
wg.Wait()
}
// worker is launched as a goroutine to process work from
// the buffered channel.
func worker(tasks chan string, worker int) {
// Report that we just returned.
defer wg.Done()
for {
// Wait for work to be assigned.
task, ok := <-tasks
if !ok {
// This means the channel is empty and closed.
fmt.Printf("Worker: %d : Shutting Down\n", worker)
return
}
// Display we are starting the work.
fmt.Printf("Worker: %d : Started %s\n", worker, task)
// Randomly wait to simulate work time.
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
// Display we finished the work.
fmt.Printf("Worker: %d : Completed %s\n", worker, task)
}
}
6.6 小结
- 并发是指 goroutine 运行的时候相互独立
- 使用关键字 go 创建 goroutine 来运行函数
- goroutine 在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列
- 竞争状态是指两个或者多个 goroutine 之间试图访问同一资源
- 原子函数和互斥锁提供了一种防止出现竞争状态的办法
- 通道提供了一种在两个 goroutine 之间共享数据的简单方法
- 无缓冲通道保证同时交换数据,有缓冲通道不做这种保证