Golang 协程优雅退出
程序员文章站
2022-04-22 22:21:46
...
关于golang中消费协程优雅退出的写法,写了一个简单的小例子进行记录一下。
使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理
通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行
package main
import (
"fmt"
"git.code.oa.com/gongyi/gongyi_base/log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var dataChan chan int
/*************************测试生产类*************************/
type TestProducer struct {
closedChan chan struct{}
wg sync.WaitGroup
}
func (producer *TestProducer)Produce() {
defer producer.wg.Done()
data := 1
for {
dataChan<-data
log.Infof("push data:%d succ", data)
data++
time.Sleep(time.Second * 1)
select {
// 若关闭了通道,直接退出
case <-producer.closedChan:
return
// 不可阻塞
default:
continue
}
}
}
func (producer *TestProducer) Stop() {
close(producer.closedChan)
producer.wg.Wait()
log.Infof("producer has stoped...")
}
/*************************测试消费类*************************/
type TestWorker struct {
workNo int
closedChan chan struct{}
wg sync.WaitGroup
}
func (test* TestWorker)Work() {
defer test.wg.Done()
for {
// 两个select的目的为了确保dataChan中的消费完之后才允许退出,因为单个select中io触发的顺序不确定,可能导致数据管道中还有数据就退出了
select {
case data := <-dataChan:
log.Infof("start to deal data:%d...", data)
time.Sleep(10 * time.Second)
log.Infof("end deal data:%d...", data)
continue
default:
}
select {
case data := <-dataChan:
log.Infof("start to deal data:%d...", data)
time.Sleep(10 * time.Second)
log.Infof("end deal data:%d...", data)
continue
case <-test.closedChan:
log.Infof("worker %d exit...", test.workNo)
return
}
}
}
func (test* TestWorker)Stop() {
close(test.closedChan)
test.wg.Wait()
log.Infof("%d has stoped...", test.workNo)
}
/*************************主逻辑*************************/
func main() {
dataChan = make(chan int, 86400)
// 创建生产协程,并启动
producer := TestProducer{
closedChan: make(chan struct{}),
}
producer.wg.Add(1)
go producer.Produce()
// 创建消费协程,并启动
workerNumber := 7
var workers []TestWorker
for i := 0; i < workerNumber; i++ {
workers = append(workers, TestWorker{
workNo: i,
closedChan: make(chan struct{}),
})
}
for i := 0; i < workerNumber; i++ {
workers[i].wg.Add(1)
go workers[i].Work()
}
// 信号处理
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
producer.Stop()
for i := 0; i < workerNumber; i++ {
workers[i].Stop()
}
}
}