欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

protobuf、LRU、sigleflight

程序员文章站 2022-11-10 12:47:53
今天咱一次讲3个吧,赶一下进度,好早点开始聊kubernetes! 从groupcache的项目目录结构看,我们今天要学习groupcachepb、lru、singleflight这3个package: 一、protobuf 这个目录咋一看有2个文件:go和proto后缀的。proto后缀的文件和p ......

今天咱一次讲3个吧,赶一下进度,好早点开始聊kubernetes!

从groupcache的项目目录结构看,我们今天要学习groupcachepb、lru、singleflight这3个package:

protobuf、LRU、sigleflight

一、protobuf

  这个目录咋一看有2个文件:go和proto后缀的。proto后缀的文件和protocol buffers有关,所以先看看protocol buffers是什么吧。

  在github上可以看到这个项目:https://github.com/google/protobuf

  google的,是不是瞬间来了兴趣?

  官方介绍是:protocol buffers (a.k.a., protobuf) are google's language-neutral, platform-neutral, extensible mechanism for serializing structured data.简单说就是跨语言跨平台的可拓展的结构数据序列化用的。翻译着有点别扭,还是直接看英文好理解。。。行,现在大家知道这个是用来做数据序列化的了,大家是否记得golang自带的一个数据结构序列化编码/解码工具gob?之前我们有专门介绍过:《golang - gob与rpc》。

  ok,看过gob这篇文章,大家就知道protobuf需要解决的基本问题了,下面我们结合源码来看protobuf的知识点。

$gopath\src\github.com\golang\groupcache\groupcachepb\groupcache.proto内容如下:

 1syntax = "proto2";
2
3package groupcachepb;
4
5message getrequest {
6  required string group = 1;
7  required string key = 2; // not actually required/guaranteed to be utf-8
8}
9
10message getresponse {
11  optional bytes value = 1;
12  optional double minute_qps = 2;
13}
14
15service groupcache {
16  rpc get(getrequest) returns (getresponse) {
17  };
18}

  可以看到这是某种语法的数据定义格式,我们先介绍一下这里涉及的概念:

  protobuf中主要数据类型有:

  • 标准数据类型:整型,浮点,字符串等

  • 复合数据类型:枚举和message类型

  看message部分:

message getresponse {
    optional bytes value = 1;
    optional double minute_qps = 2;
}

  • 每个字段末尾有一个tag,这个tag要求不重复,如这里的1、2;

  • 每个字段有一个类型,如这里的bytes、double;

  • 每个字段开头的optional含义为:

    • required: 必须赋值,不能为空

    • optional:可以赋值,也可以不赋值

    • repeated: 该字段可以重复任意次数,包括0次

  现在我们可以看懂这个message的名字是getresponse,有2个可选字段value和minute_qps,两个字段的类型分别为bytes和double,2个字段都是optional的。

  protobuf也提供了包的定义,只要在文件开头定义package关键字即可,所以这里的package groupcachepb;这行也好理解;第一行syntax = "proto2";明显是声明版本的,除了proto2外还有proto3版本,类似与py2后有了py3。

  到这里就剩下最后几行有点疑惑了:

service groupcache {

    rpc get(getrequest) returns (getresponse) {

    };

}

  这里可以看到打头的是service,中间的字段是一个rpc相关的类似函数的东西,参数和返回值都是上面定义的message:getrequestgetresponse,明显这里和rpc要有关系了,细节我们先不讲,到后面调用到的地方咱再结合业务代码来理解这里的细节。

 

二、lru

  查一下百度百科,可以得到lru的解释如下:

  内存管理的一种页面置换算法,对于在内存中但又不用的数据块(内存块)叫做lru,操作系统会根据哪些数据属于lru而将其移出内存而腾出空间来加载另外的数据。

什么是lru算法? lru是least recently used的缩写,即最近最少使用,常用于页面置换算法,是为虚拟页式存储管理服务的。

  所以这里的lru包也就是用来实现lru算法的,详细的解释我放在注释中:$gopath\src\github.com\golang\groupcache\lru\lru.go:

  1// package lru implements an lru cache.
2//【lru包用于实现lru cache】
3package lru
4
5import "container/list"
6
7// cache is an lru cache. it is not safe for concurrent access.
8//【cache结构用于实现lru cache算法;并发访问不安全】
9type cache struct {
10    // maxentries is the maximum number of cache entries before
11    // an item is evicted. zero means no limit.
12    //【最大入口数,也就是缓存中最多存几条数据,超过了就触发数据淘汰;0表示没有限制】
13    maxentries int
14
15    // onevicted optionally specificies a callback function to be
16    // executed when an entry is purged from the cache.
17    //【销毁前回调】
18    onevicted func(key key, value interface{})
19
20    //【链表】
21    ll *list.list
22    //【key为任意类型,值为指向链表一个结点的指针】
23    cache map[interface{}]*list.element
24}
25
26// a key may be any value that is comparable. 
27// see http://golang.org/ref/spec#comparison_operators
28//【任意可比较类型】
29type key interface{}
30
31//【访问入口结构,包装键值】
32type entry struct {
33    key   key
34    value interface{}
35}
36
37// new creates a new cache.
38// if maxentries is zero, the cache has no limit and it's assumed
39// that eviction is done by the caller.
40//【初始化一个cache类型实例】
41func new(maxentries int) *cache {
42    return &cache{
43        maxentries: maxentries,
44        ll:         list.new(),
45        cache:      make(map[interface{}]*list.element),
46    }
47}
48
49// add adds a value to the cache.
50//【往缓存中增加一个值】
51func (c *cache) add(key key, value interface{}) {
52    //【如果cache还没有初始化,先初始化,创建cache和l1】
53    if c.cache == nil {
54        c.cache = make(map[interface{}]*list.element)
55        c.ll = list.new()
56    }
57    //【如果key已经存在,则将记录前移到头部,然后设置value】
58    if ee, ok := c.cache[key]; ok {
59        c.ll.movetofront(ee)
60        ee.value.(*entry).value = value
61        return
62    }
63    //【key不存在时,创建一条记录,插入链表头部,ele是这个element的指针】
64    //【这里的element是一个*entry类型,ele是*list.element类型】
65    ele := c.ll.pushfront(&entry{key, value})
66    //cache这个map设置key为key类型的key,value为*list.element类型的ele
67    c.cache[key] = ele
68    //【链表长度超过最大入口值,触发清理操作】
69    if c.maxentries != 0 && c.ll.len() > c.maxentries {
70        c.removeoldest()
71    }
72}
73
74// get looks up a key's value from the cache.
75//【根据key查找value】
76func (c *cache) get(key key) (value interface{}, ok bool) {
77    if c.cache == nil {
78        return
79    }
80    //【如果存在】
81    if ele, hit := c.cache[key]; hit {
82        //【将这个element移动到链表头部】
83        c.ll.movetofront(ele)
84        //【返回entry的值】
85        return ele.value.(*entry).value, true
86    }
87    return
88}
89
90// remove removes the provided key from the cache.
91//【如果key存在,调用removeelement删除链表and缓存中的元素】
92func (c *cache) remove(key key) {
93    if c.cache == nil {
94        return
95    }
96    if ele, hit := c.cache[key]; hit {
97        c.removeelement(ele)
98    }
99}
100
101// removeoldest removes the oldest item from the cache.
102//【删除最旧的元素】
103func (c *cache) removeoldest() {
104    if c.cache == nil {
105        return
106    }
107    //【ele为*list.element类型,指向链表的尾结点】
108    ele := c.ll.back()
109    if ele != nil {
110        c.removeelement(ele)
111    }
112}
113
114func (c *cache) removeelement(e *list.element) {
115    //【链表中删除一个element】
116    c.ll.remove(e)
117    //【e.value本质是*entry类型,entry结构体就包含了key和value2个属性】
118    //【value本身是interface{}类型,通过类型断言转成*entry类型】
119    kv := e.value.(*entry)
120    //【删除cache这个map中key为kv.key这个元素;也就是链表中删了之后缓存中也得删】
121    delete(c.cache, kv.key)
122    if c.onevicted != nil {
123        c.onevicted(kv.key, kv.value)
124    }
125}
126
127// len returns the number of items in the cache.
128//【返回缓存中的item数,通过链表的len()方法获取】
129func (c *cache) len() int {
130    if c.cache == nil {
131        return 0
132    }
133    return c.ll.len()
134}
135
136// clear purges all stored items from the cache.
137//【删除缓存中所有条目,如果有回调函数onevicted(),则先调用所有回调函数,然后置空】
138func (c *cache) clear() {
139    if c.onevicted != nil {
140        for _, e := range c.cache {
141            kv := e.value.(*entry)
142            c.onevicted(kv.key, kv.value)
143        }
144    }
145    c.ll = nil
146    c.cache = nil
147}

三、singleflight

  这个package主要实现了这样一个功能:抑制同一个函数调用重复执行。举个例子:给一个常规程序输入一个函数调用a()需要10s返回结果,这时候有10个客户端都调用了这个a(),可能就需要100s才能完成所有的计算结果,但是这个计算是重复的,结果也是一样的。所以可以想个办法,判断是同一个计算过程的情况,不需要重复执行,直接等待上一次计算完成,然后一下子返回结果就行了。下面看一下groupcache中是如何实现这个算法的吧:

 1// package singleflight provides a duplicate function call suppression
2// mechanism.
3//【“单航班”提供重复调用函数的抑制机制】
4package singleflight
5
6import "sync"
7
8// call is an in-flight or completed do call
9//【在执行的或者已经完成的do过程】
10type call struct {
11    wg  sync.waitgroup
12    val interface{}
13    err error
14}
15
16// group represents a class of work and forms a namespace in which
17// units of work can be executed with duplicate suppression.
18//【表示一类工作,组成一个命名空间的概念,一个group的调用会有“重复抑制”】
19type group struct {
20    mu sync.mutex // protects m
21    //【懒惰地初始化;这个map的value是*call,call是上面那个struct】
22    m map[string]*call // lazily initialized
23}
24
25// do executes and returns the results of the given function, making
26// sure that only one execution is in-flight for a given key at a
27// time. if a duplicate comes in, the duplicate caller waits for the
28// original to complete and receives the same results.
29
30//【do接收一个函数,执行并返回结果,
31// 这个过程中确保同一个key在同一时间只有一个执行过程;
32// 重复的调用会等待最原始的调用过程完成,然后接收到相同的结果】
33func (g *group) do(key string, fn func() (interface{}, error)) (interface{}, error) {
34    g.mu.lock()
35    if g.m == nil {
36        g.m = make(map[string]*call)
37    }
38    //【如果这个call存在同名过程,等待初始调用完成,然后返回val和err】
39    if c, ok := g.m[key]; ok {
40        g.mu.unlock()
41        c.wg.wait()
42        //【当所有goroutine执行完毕,call中就存储了执行结果val和err,然后这里返回】
43        return c.val, c.err
44    }
45    //【拿到call结构体类型的指针】
46    c := new(call)
47    //【一个goroutine开始,add(1),这里最多只会执行到一次,也就是不会并发调用下面的fn()】
48    c.wg.add(1)
49    //【类似设置一个函数调用的名字“key”对应调用过程c】
50    g.m[key] = c
51    g.mu.unlock()
52
53    //【函数调用过程】
54    c.val, c.err = fn()
55    //【这里的done对应上面if里面的wait】
56    c.wg.done()
57
58    g.mu.lock()
59    //【执行完成,删除这个key】
60    delete(g.m, key)
61    g.mu.unlock()
62
63    return c.val, c.err
64}

  今天讲的可能有点多,其中设计到的list之类的没有细讲,希望大家通过互联网掌握这类我没有仔细提到的小知识点,彻底吃透这几个package中的源码。

  回过头看一下项目结果,除了testpb包外其他包我们都讲完了,testpb是groupcachepb对应的测试程序,下一讲我们就可以把这几个包外的所有程序分析完,包括对protobuf部分的调用逻辑。

protobuf、LRU、sigleflight

 

  今天就到这里,groupcache源码解析还剩最后一讲!

protobuf、LRU、sigleflight