欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Golang 协程优雅退出

程序员文章站 2022-04-22 22:21:46
...

关于golang中消费协程优雅退出的写法,写了一个简单的小例子进行记录一下。
使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理

通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行

package main

import (
	"fmt"
	"git.code.oa.com/gongyi/gongyi_base/log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

var dataChan chan int
/*************************测试生产类*************************/
type TestProducer struct {
	closedChan chan struct{}
	wg sync.WaitGroup
}

func (producer *TestProducer)Produce() {
	defer producer.wg.Done()
	data := 1
	for {
		dataChan<-data
		log.Infof("push data:%d succ", data)
		data++
		time.Sleep(time.Second * 1)

		select {
			// 	若关闭了通道,直接退出
			case <-producer.closedChan:
				return
			// 不可阻塞
			default:
				continue
		}
	}
}

func (producer *TestProducer) Stop() {
	close(producer.closedChan)
	producer.wg.Wait()
	log.Infof("producer has stoped...")
}

/*************************测试消费类*************************/
type TestWorker struct {
	workNo int
	closedChan chan struct{}
	wg sync.WaitGroup
}

func (test* TestWorker)Work() {
	defer test.wg.Done()
	for {
		// 两个select的目的为了确保dataChan中的消费完之后才允许退出,因为单个select中io触发的顺序不确定,可能导致数据管道中还有数据就退出了
		select {
			case data := <-dataChan:
				log.Infof("start to deal data:%d...", data)
				time.Sleep(10 * time.Second)
				log.Infof("end deal data:%d...", data)
				continue
			default:
		}

		select {
			case data := <-dataChan:
				log.Infof("start to deal data:%d...", data)
				time.Sleep(10 * time.Second)
				log.Infof("end deal data:%d...", data)
				continue
			case <-test.closedChan:
				log.Infof("worker %d exit...", test.workNo)
				return
		}
	}
}

func (test* TestWorker)Stop() {
	close(test.closedChan)
	test.wg.Wait()
	log.Infof("%d has stoped...", test.workNo)
}

/*************************主逻辑*************************/
func main() {
	dataChan = make(chan int, 86400)

	// 创建生产协程,并启动
	producer := TestProducer{
		closedChan: make(chan struct{}),
	}
	producer.wg.Add(1)
	go producer.Produce()

	// 创建消费协程,并启动
	workerNumber := 7
	var workers []TestWorker
	for i := 0; i < workerNumber; i++ {
		workers = append(workers, TestWorker{
			workNo:    i,
			closedChan: make(chan struct{}),
		})
	}

	for i := 0; i < workerNumber; i++ {
		workers[i].wg.Add(1)
		go workers[i].Work()
	}

	// 信号处理
	c := make(chan os.Signal)
	signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
	select {
	case sig := <-c:
		fmt.Printf("Got %s signal. Aborting...\n", sig)
		producer.Stop()
		for i := 0; i < workerNumber; i++ {
			workers[i].Stop()
		}
	}
}
相关标签: golang学习