欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kubernetes-源码研习社-Workqueue

程序员文章站 2022-04-20 20:02:31
...

Workqueue

主要功能
  1. 有序 根据添加顺序进行入队列
  2. 去重 已经存在的的消息,只会处理一次
  3. 并发 消费者和提供者并发处理
  4. 标记 标记元素是否处理,处理异常可以重回队列
  5. 通知 shutdown 方法告诉队列,不消费新元素
  6. 延迟 支持延迟一段时间把消息加入队列
  7. metric 提供监控
  8. FIFO 先进先出队列
  9. delay 延迟队列
  10. rateLimit 限速队列

FIFO队列

数据结构

type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	queue []t # 队列 被处理的队列

	// dirty defines all of the items that need to be processed.
	dirty set  # 标记需要处理的item,可用于去重 确保同一个消息在同一时间只有一个处理

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set

	cond *sync.Cond

	shuttingDown bool

	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}

封装

type Interface interface {
	Add(item interface{}) # 添加一个消息
	Len() int  # 统计消息大小
	Get() (item interface{}, shutdown bool) # 获取一个消息
	Done(item interface{}) # 标记元素正在处理
	ShutDown() # 关闭队列
	ShuttingDown() bool # 查询队列是否关闭
}

Add 方法

## 调用Add会把消息存放在dirty和queue,dirty用于消息去重,queue用于消息消费
func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	if q.shuttingDown {
		return
	}
	# 判断消息是否存在dirty 队列,去重
	if q.dirty.has(item) {
		return
	}

	q.metrics.add(item)
	# 添加到dirty 队列
	q.dirty.insert(item)
	if q.processing.has(item) {
		return
	}
	# 添加到queue队列
	q.queue = append(q.queue, item)
	q.cond.Signal()
}

GET方法

func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	# 是否存在消息并且消息通道是否关闭
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait() # 等待消费消息
	}
	# queue 里面不存在消息
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}
	# 取一个消息
	item, q.queue = q.queue[0], q.queue[1:]

	q.metrics.get(item)
	# 进入到process 进行处理
	q.processing.insert(item)
	# 从dirty 删除消息
	q.dirty.delete(item)
	# 返回消息
	return item, false
}

Done 方法

func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)
	# 删除消息
	q.processing.delete(item)
	# 判断dirty里面是否有该消息,有的话重会queue
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	}
}

Shutdown 方法

func (q *Type) ShutDown() {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	q.shuttingDown = true
	q.cond.Broadcast()
}
去重
# 判断在process里面是否存在消费的消息
func (s set) has(item t) bool {
	_, exists := s[item]
	return exists
}
删除
# 在process里面删除消息
func (s set) delete(item t) {
	delete(s, item)
}

延迟队列

封装

type DelayingInterface interface {
	Interface # 使用FIFO 队列上进行封装一个ADDafter 
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

AddAfter

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// immediately add things with no delay
	# duration  小于等于0 直接添加到queue里面
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:// 等待duration时间在添加到queue
	}
}

限速队列

封装

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface  # 延迟队列上面进行封装

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{}) 

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

Forget方法

func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}

AddRateLimited 方法

func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

NumRequeues 方法

func (q *rateLimitingType) NumRequeues(item interface{}) int {
	return q.rateLimiter.NumRequeues(item)
}