欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

go-goroutine 和 channel

程序员文章站 2022-06-11 16:04:49
goroutine 和 channel goroutine 看一个需求 需求:要求统计 1 9000000000 的数字中,哪些是素数? 分析思路: 1) 传统的方法,就是使用一个循环,循环的判断各个数是不是素数。[很慢] 2) 使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine ......

goroutine 和 channel

goroutine-看一个需求
需求:要求统计 1-9000000000 的数字中,哪些是素数?

分析思路:
1) 传统的方法,就是使用一个循环,循环的判断各个数是不是素数。[很慢]
2) 使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine 去完成,这时就会使用到goroutine.【速度提高 4 倍】

goroutine-基本介绍

进程和线程介绍

程序、进程和线程的关系

并发和并行

并发和并行
1) 多线程程序在单核上运行,就是并发
2) 多线程程序在多核上运行,就是并行

go 协程和 go 主线程

go 主线程(有程序员直接称为线程/也可以理解成进程): 一个 go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程[编译器做优化]。

go 协程的特点
1) 有独立的栈空间
2) 共享程序堆空间
3) 调度由用户控制
4) 协程是轻量级的线程

goroutine-快速入门

案例说明
请编写一个程序,完成如下功能:
1) 在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 1 秒输出 "hello,world"
2) 在主线程中也每隔一秒输出"hello,golang", 输出 10 次后,退出程序
3) 要求主线程和 goroutine 同时执行.

package main
import (
    "fmt"
    "strconv"
    "time"
)

// 在主线程(可以理解成进程)中,开启一个goroutine, 该协程每隔1秒输出 "hello,world"
// 在主线程中也每隔一秒输出"hello,golang", 输出10次后,退出程序
// 要求主线程和goroutine同时执行

//编写一个函数,每隔1秒输出 "hello,world"
func test() {
    for i := 1; i <= 10; i++ {
        fmt.println("tesst () hello,world " + strconv.itoa(i))
        time.sleep(time.second)
    }
}

func main() {

    go test() // 开启了一个协程

    for i := 1; i <= 10; i++ {
        fmt.println(" main() hello,golang" + strconv.itoa(i))
        time.sleep(time.second)
    }
}

快速入门小结

1) 主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。
2) 协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
3) golang 的协程机制是重要的特点,可以轻松的 开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 golang 在并发上的优势了

goroutine 的调度模型

mpg 模式基本介绍

解释一下mpg含义:
m(machine):操作系统的主线程
p(processor):协程执行需要的资源(上下文context),可以看作一个局部的调度器,使go代码在一个线程上跑,他是实现从n:1到n:m映射的关键
g(gorountine):协程,有自己的栈。包含指令指针(instruction pointer)和其它信息(正在等待的channel等等),用于调度。一个p下面可以有多个g

mpg 模式运行的状态一

  • p的数量可以通过gomaxprocs()来设置,他其实代表了真正的并发度,即有多少个goroutine可以同时运行。p同时也维护着g(协程)的队列(称之为runqueue进程队列)。go代码中的m每有一个语句被执行,p就在末尾加入一个g(从runqueue队列中取出来的),在下一个调度点(p),就从runqueue队列中取出g。

  • p可以在os线程(主线程,或者是m)被阻塞时,转到另一个os线程(m)!go中的调度器保证有足够的线程来运行所有的p。当启用一个m0中的g0被syscall(系统调用)的时候,m0下面的p转给另一个线程m1(可以是创建的,也可以是原本就存在的)。m1接受了p(包括p所带的runqueue的队列里面所有状态的g,但不包括已经被syscall的g0),继续运行。而m0会等待执行syscall的g0的返回值。当g0的syscall结束后,他的主线程m0会尝试取得一个p来运行g0,一般情况下,他会从其他的m里面偷一个p过来,如果没有偷到的话就会把g0放到一个global runqueue(全局进程队列)中,然后把自己(m0)放进线程池或者转为休眠状态。

设置 golang 运行的 cpu 数

介绍:为了充分了利用多 cpu 的优势,在 golang 程序中,设置运行的 cpu 数目

package main
import (
    "runtime"
    "fmt"
)

func main() {
    cpunum := runtime.numcpu()
    fmt.println("cpunum=", cpunum)

    //可以自己设置使用多个cpu
    runtime.gomaxprocs(cpunum - 1)
    fmt.println("ok")
}


channel(管道)-看个需求

需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。最后显示出来。
要求使用 goroutine 完成

分析思路:
1) 使用 goroutine 来完成,效率高,但是会出现并发/并行安全问题.
2) 这里就提出了不同 goroutine 如何通信的问题

代码实现
1) 使用 goroutine 来完成(看看使用 gorotine 并发完成会出现什么问题? 然后去解决)
2) 在运行某个程序时,如何知道是否存在资源竞争问题。 方法很简单,在编译该程序时,增加一个参数 -race 即可

不同 goroutine 之间如何通讯

1) 全局变量的互斥锁
2) 使用管道 channel 来解决

使用全局变量加锁同步改进程序

因为没有对全局变量 m 加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map
writes
解决方案:加入互斥锁
我们的数的阶乘很大,结果会越界,可以将求阶乘改成 sum += uint64(i)

package main
import (
    "fmt"
    _ "time"
    "sync"
)

// 需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到map中。
// 最后显示出来。要求使用goroutine完成 

// 思路
// 1. 编写一个函数,来计算各个数的阶乘,并放入到 map中.
// 2. 我们启动的协程多个,统计的将结果放入到 map中
// 3. map 应该做出一个全局的.

var (
    mymap = make(map[int]int, 10)  
    //声明一个全局的互斥锁
    //lock 是一个全局的互斥锁, 
    //sync 是包: synchornized 同步
    //mutex : 是互斥
    lock sync.mutex
)

// test 函数就是计算 n!, 让将这个结果放入到 mymap
func test(n int) {
    
    res := 1
    for i := 1; i <= n; i++ {
        res *= i
    }

    //这里我们将 res 放入到mymap
    //加锁
    lock.lock()
    mymap[n] = res //concurrent map writes?
    //解锁
    lock.unlock()
}

func main() {

    // 我们这里开启多个协程完成这个任务[200个]
    for i := 1; i <= 20; i++ {
        go test(i)
    }


    //休眠10秒钟【第二个问题 】
    //time.sleep(time.second * 5)

    //这里我们输出结果,变量这个结果
    lock.lock()
    for i, v := range mymap {
        fmt.printf("map[%d]=%d\n", i, v)
    }
    lock.unlock()

}

为什么需要 channel

1) 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美
2) 主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
3) 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作
状态,这时也会随主线程的退出而销毁
4) 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
5) 上面种种分析都在呼唤一个新的通讯机制-channel

channel 的基本介绍

1) channle 本质就是一个数据结构-队列
2) 数据是先进先出【fifo : first in first out】
3) 线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的
4) channel 有类型的,一个 string 的 channel 只能存放 string 类型数据。

定义/声明 channel

var 变量名 chan 数据类型
举例:

var intchan chan int (intchan 用于存放 int 数据)
var mapchan chan map[int]string (mapchan 用于存放 map[int]string 类型)
var perchan chan person
var perchan2 chan *person
...

说明
channel 是引用类型
channel 必须初始化才能写入数据, 即 make 后才能使用
管道是有类型的,intchan 只能写入 整数 int

package main
import (
    "fmt"
)

func main() {

    //演示一下管道的使用
    //1. 创建一个可以存放3个int类型的管道
    var intchan chan int
    intchan = make(chan int, 3)

    //2. 看看intchan是什么
    fmt.printf("intchan 的值=%v intchan本身的地址=%p\n", intchan, &intchan)


    //3. 向管道写入数据
    intchan<- 10
    num := 211
    intchan<- num
    intchan<- 50
    // //如果从channel取出数据后,可以继续放入
    <-intchan
    intchan<- 98//注意点, 当我们给管写入数据时,不能超过其容量


    //4. 看看管道的长度和cap(容量)
    fmt.printf("channel len= %v cap=%v \n", len(intchan), cap(intchan)) // 3, 3

    //5. 从管道中读取数据

    var num2 int
    num2 = <-intchan 
    fmt.println("num2=", num2)
    fmt.printf("channel len= %v cap=%v \n", len(intchan), cap(intchan))  // 2, 3

    //6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock

    num3 := <-intchan
    num4 := <-intchan

    //num5 := <-intchan

    fmt.println("num3=", num3, "num4=", num4/*, "num5=", num5*/)

}

管道的初始化,写入数据到管道,从管道读取数据及基本的注意事项

channel 使用的注意事项

1) channel 中只能存放指定的数据类型
2) channle 的数据放满后,就不能再放入了
3) 如果从 channel 取出数据后,可以继续放入
4) 在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock

读写 channel 案例演示

package main
import (
    "fmt"
)

type cat struct {
    name string
    age int
}

func main() {

    //定义一个存放任意数据类型的管道 3个数据
    //var allchan chan interface{}
    allchan := make(chan interface{}, 3)

    allchan<- 10
    allchan<- "tom jack"
    cat := cat{"小花猫", 4}
    allchan<- cat

    //我们希望获得到管道中的第三个元素,则先将前2个推出
    <-allchan
    <-allchan

    newcat := <-allchan //从管道中取出的cat是什么?

    fmt.printf("newcat=%t , newcat=%v\n", newcat, newcat)
    //下面的写法是错误的!编译不通过
    //fmt.printf("newcat.name=%v", newcat.name)
    //使用类型断言
    a := newcat.(cat) 
    fmt.printf("newcat.name=%v", a.name)
    

}

channel 的遍历和关闭

channel 的关闭

使用内置函数 close 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据

channel 的遍历

channel 支持 for--range 的方式进行遍历,请注意两个细节
1) 在遍历时,如果 channel 没有关闭,则回出现 deadlock 的错误
2) 在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。

channel 遍历和关闭的案例演示

package main
import (
   "fmt"
)

func main() {

   intchan := make(chan int, 3)
   intchan<- 100
   intchan<- 200
   close(intchan) // close
   //这是不能够再写入数到channel
   //intchan<- 300
   fmt.println("okook~")
   //当管道关闭后,读取数据是可以的
   n1 := <-intchan
   fmt.println("n1=", n1)


   //遍历管道
   intchan2 := make(chan int, 100)
   for i := 0; i < 100; i++ {
       intchan2<- i * 2  //放入100个数据到管道
   }

   //遍历管道不能使用普通的 for 循环
   // for i := 0; i < len(intchan2); i++ {

   // }
   //在遍历时,如果channel没有关闭,则会出现deadlock的错误
   //在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历
   close(intchan2)
   for v := range intchan2 {
       fmt.println("v=", v)
   }


}

应用示例--channel与goroutine

package main
import (
    "fmt"
    "time"
)


//write data
func writedata(intchan chan int) {
    for i := 1; i <= 50; i++ {
        //放入数据
        intchan<- i //
        fmt.println("writedata ", i)
        //time.sleep(time.second)
    }
    close(intchan) //关闭
}

//read data
func readdata(intchan chan int, exitchan chan bool) {

    for {
        v, ok := <-intchan
        if !ok {
            break
        }
        time.sleep(time.second)
        fmt.printf("readdata 读到数据=%v\n", v) 
    }
    //readdata 读取完数据后,即任务完成
    exitchan<- true
    close(exitchan)

}

func main() {

    //创建两个管道
    intchan := make(chan int, 10)
    exitchan := make(chan bool, 1)
    
    go writedata(intchan)
    go readdata(intchan, exitchan)

    time.sleep(time.second * 10)
    for {
        _, ok := <-exitchan
        if !ok {
            break
        }
    }

}

应用实例 2-阻塞

若上面的代码,注释掉go readdata(intchan, exitchan),会怎样,因为管道有长度,所以当编译器发现一个管道只有写而没有读,改管道会阻塞(读与写的频率不一致没关系)!

应用实例 3

需求:
要求统计 1-200000 的数字中,哪些是素数?这个问题在本章开篇就提出了,现在我们有 goroutine和 channel 的知识后,就可以完成了 [测试数据: 80000]

分析思路:

传统的方法,就是使用一个循环,循环的判断各个数是不是素数【ok】。
使用并发/并行的方式,将统计素数的任务分配给多个(4 个)goroutine 去完成,完成任务时间短。

传统方法,一个协程

package main
import (
    "time"
    "fmt"
)

func main() {

        start := time.now().unix()
        for num := 1; num <= 80000; num++ {

            flag := true //假设是素数
            //判断num是不是素数
            for i := 2; i < num; i++ {
                if num % i == 0 {//说明该num不是素数
                    flag = false
                    break
                }
            }

            if flag {
                //将这个数就放入到primechan
                //primechan<- num
            }

        }
        end := time.now().unix()
        fmt.println("普通的方法耗时=", end - start)
        
}

开了四个协程

package main
import (
    "fmt"
    "time"
)



//向 intchan放入 1-8000个数
func putnum(intchan chan int) {

    for i := 1; i <= 80000; i++ {    
        intchan<- i
    }

    //关闭intchan
    close(intchan)
}

// 从 intchan取出数据,并判断是否为素数,如果是,就
//  //放入到primechan
func primenum(intchan chan int, primechan chan int, exitchan chan bool) {

    //使用for 循环
    // var num int
    var flag bool // 
    for {
        //time.sleep(time.millisecond * 10)
        num, ok := <-intchan //intchan 取不到..
        
        if !ok { 
            break
        }
        flag = true //假设是素数
        //判断num是不是素数
        for i := 2; i < num; i++ {
            if num % i == 0 {//说明该num不是素数
                flag = false
                break
            }
        }

        if flag {
            //将这个数就放入到primechan
            primechan<- num
        }
    }

    fmt.println("有一个primenum 协程因为取不到数据,退出")
    //这里我们还不能关闭 primechan
    //向 exitchan 写入true
    exitchan<- true 

}

func main() {

    intchan := make(chan int , 1000)
    primechan := make(chan int, 20000)//放入结果
    //标识退出的管道
    exitchan := make(chan bool, 8) // 4个



    start := time.now().unix()
    
    //开启一个协程,向 intchan放入 1-8000个数
    go putnum(intchan)
    //开启4个协程,从 intchan取出数据,并判断是否为素数,如果是,就
    //放入到primechan
    for i := 0; i < 8; i++ {
        go primenum(intchan, primechan, exitchan)
    }

    //这里我们主线程,进行处理
    //直接
    go func(){
        for i := 0; i < 8; i++ {
            <-exitchan
        }

        end := time.now().unix()
        fmt.println("使用协程耗时=", end - start)

        //当我们从exitchan 取出了4个结果,就可以放心的关闭 prprimechan
        close(primechan)
    }()


    //遍历我们的 primechan ,把结果取出
    for {
        _, ok := <-primechan
        if !ok{
            break
        }
        //将结果输出
        //fmt.printf("素数=%d\n", res)
    }

    fmt.println("main线程退出")


    
}

结论:使用 go 协程后,执行的速度,理论上比普通方法提高至少 4 倍(我这是两倍)

channel 使用细节和注意事项

1) channel 可以声明为只读,或者只写性质 【案例演示】

package main
import (
    "fmt"
)

func main() {
    //管道可以声明为只读或者只写

    //1. 在默认情况下下,管道是双向
    //var chan1 chan int //可读可写
    
    //2 声明为只写
    var chan2 chan<- int
    chan2 = make(chan int, 3)
    chan2<- 20
    //num := <-chan2 //error

    fmt.println("chan2=", chan2)

    //3. 声明为只读
    var chan3 <-chan int
    num2 := <-chan3
    //chan3<- 30 //err
    fmt.println("num2", num2)

}

3) 使用 select 可以解决从管道取数据的阻塞问题

package main
import (
    "fmt"
    "time"
)

func main() {

    //使用select可以解决从管道取数据的阻塞问题

    //1.定义一个管道 10个数据int
    intchan := make(chan int, 10)
    for i := 0; i < 10; i++ {
        intchan<- i
    }
    //2.定义一个管道 5个数据string
    stringchan := make(chan string, 5)
    for i := 0; i < 5; i++ {
        stringchan <- "hello" + fmt.sprintf("%d", i)
    }

    //传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock

    //问题,在实际开发中,可能我们不好确定什么关闭该管道.
    //可以使用select 方式可以解决
    //label:
    for {
        select {
            //注意: 这里,如果intchan一直没有关闭,不会一直阻塞而deadlock
            //,会自动到下一个case匹配
            case v := <-intchan : 
                fmt.printf("从intchan读取的数据%d\n", v)
                time.sleep(time.second)
            case v := <-stringchan :
                fmt.printf("从stringchan读取的数据%s\n", v)
                time.sleep(time.second)
            default :
                fmt.printf("都取不到了,不玩了, 程序员可以加入逻辑\n")
                time.sleep(time.second)
                return 
                //break label
        }
    }
}

4) goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题

如果我们开了一个协程,但这个协程出现panic,就会导致整个程序崩溃,这时我们可以在goroutine中使用recover来捕获panic,这样及时协程发生问题,主线程依然不受影响

package main
import (
    "fmt"
    "time"
)

//函数
func sayhello() {
    for i := 0; i < 10; i++ {
        time.sleep(time.second)
        fmt.println("hello,world")
    }
}
//函数
func test() {
    //这里我们可以使用defer + recover
    defer func() {
        //捕获test抛出的panic
        if err := recover(); err != nil {
            fmt.println("test() 发生错误", err)
        }
    }()
    //定义了一个map
    var mymap map[int]string
    mymap[0] = "golang" //error
}

func main() {

    go sayhello()
    go test()


    for i := 0; i < 10; i++ {
        fmt.println("main() ok=", i)
        time.sleep(time.second)
    }

}