Kubernetes-源码研习社-Workqueue
程序员文章站
2022-04-20 20:02:31
...
Workqueue
主要功能
- 有序 根据添加顺序进行入队列
- 去重 已经存在的的消息,只会处理一次
- 并发 消费者和提供者并发处理
- 标记 标记元素是否处理,处理异常可以重回队列
- 通知 shutdown 方法告诉队列,不消费新元素
- 延迟 支持延迟一段时间把消息加入队列
- metric 提供监控
- FIFO 先进先出队列
- delay 延迟队列
- rateLimit 限速队列
FIFO队列
数据结构
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t # 队列 被处理的队列
// dirty defines all of the items that need to be processed.
dirty set # 标记需要处理的item,可用于去重 确保同一个消息在同一时间只有一个处理
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
封装
type Interface interface {
Add(item interface{}) # 添加一个消息
Len() int # 统计消息大小
Get() (item interface{}, shutdown bool) # 获取一个消息
Done(item interface{}) # 标记元素正在处理
ShutDown() # 关闭队列
ShuttingDown() bool # 查询队列是否关闭
}
Add 方法
## 调用Add会把消息存放在dirty和queue,dirty用于消息去重,queue用于消息消费
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
# 判断消息是否存在dirty 队列,去重
if q.dirty.has(item) {
return
}
q.metrics.add(item)
# 添加到dirty 队列
q.dirty.insert(item)
if q.processing.has(item) {
return
}
# 添加到queue队列
q.queue = append(q.queue, item)
q.cond.Signal()
}
GET方法
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
# 是否存在消息并且消息通道是否关闭
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait() # 等待消费消息
}
# queue 里面不存在消息
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
# 取一个消息
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
# 进入到process 进行处理
q.processing.insert(item)
# 从dirty 删除消息
q.dirty.delete(item)
# 返回消息
return item, false
}
Done 方法
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
# 删除消息
q.processing.delete(item)
# 判断dirty里面是否有该消息,有的话重会queue
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
Shutdown 方法
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
去重
# 判断在process里面是否存在消费的消息
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
删除
# 在process里面删除消息
func (s set) delete(item t) {
delete(s, item)
}
延迟队列
封装
type DelayingInterface interface {
Interface # 使用FIFO 队列上进行封装一个ADDafter
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
# duration 小于等于0 直接添加到queue里面
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:// 等待duration时间在添加到queue
}
}
限速队列
封装
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface # 延迟队列上面进行封装
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
Forget方法
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
AddRateLimited 方法
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
NumRequeues 方法
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
上一篇: 2011年30个最好的免费WordPress主题推荐
下一篇: ZK Studio 0.9.2 发布