go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器
程序员文章站
2022-04-14 20:05:40
一、使用场景 大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。 所以需要对多个协程进行控制。 二、使用知识 1. 从一个未初始化的管道读会阻塞 2.从一个关闭的管道读不会阻塞 利用两个管道和select 进行控 ......
一、使用场景
大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。
所以需要对多个协程进行控制。
二、使用知识
1. 从一个未初始化的管道读会阻塞
2.从一个关闭的管道读不会阻塞
利用两个管道和select 进行控制
三、上代码
控制器代码
package util
import (
"errors"
"sync"
)
const (
//stop 停止
stop = iota
//start 开始
start
//pause 暂停
pause
)
//control 控制器
type control struct {
ch1 chan struct{}
ch2 chan struct{}
stat int64
lock sync.rwmutex
}
var (
//errstat 错误状态
errstat = errors.new("stat error")
)
//newcontrol 获得一个新control
func newcontrol() *control {
return &control{
ch1: make(chan struct{}),
ch2: nil,
stat: start,
lock: sync.rwmutex{},
}
}
//stop 停止
func (c *control) stop() error {
c.lock.lock()
defer c.lock.unlock()
if c.stat == start {
c.ch2 = nil
close(c.ch1)
c.stat = stop
} else if c.stat == pause {
ch2 := c.ch2
c.ch2 = nil
close(c.ch1)
close(ch2)
c.stat = stop
} else {
return errstat
}
return nil
}
//pause 暂停
func (c *control) pause() error {
c.lock.lock()
defer c.lock.unlock()
if c.stat == start {
c.ch2 = make(chan struct{})
close(c.ch1)
c.stat = pause
} else {
return errstat
}
return nil
}
//start 开始
func (c *control) start() error {
c.lock.lock()
defer c.lock.unlock()
if c.stat == pause {
c.ch1 = make(chan struct{})
close(c.ch2)
c.stat = start
} else {
return errstat
}
return nil
}
//c 控制管道
func (c *control) c() <-chan struct{} {
c.lock.rlock()
defer c.lock.runlock()
return c.ch1
}
//wait 等待
func (c *control) wait() bool {
c.lock.rlock()
ch2 := c.ch2
c.lock.runlock()
if ch2 == nil { //通过赋值nil 发送停止推出命令
return false
}
<-ch2 //会进行阻塞
return true
}
使用代码
for {
select {
case part, ok := <-c.partitions():
if !ok {
conf.logger.error("get kafka partitions not ok", regular.name)
return
}
go readfrompart(c, part, regular, respchan)
case <-regular.c(): //regular 为control 类
if !regular.wait() {
conf.logger.debug("stop! ")
return
}
conf.logger.debug("start! ")
}
}
这样就可以随时随地的控制工程中的协程
regular := util.newcontrol()
regular.pause()
regular.start()
regular.stop()