今天咱一次讲3个吧,赶一下进度,好早点开始聊kubernetes!git
从groupcache的项目目录结构看,咱们今天要学习groupcachepb、lru、singleflight这3个package:github
1、protobuf
这个目录咋一看有2个文件:go和proto后缀的。proto后缀的文件和protocol buffers有关,因此先看看protocol buffers是什么吧。golang
在github上能够看到这个项目:https://github.com/google/protobuf算法
google的,是否是瞬间来了兴趣?c#
官方介绍是:Protocol Buffers (a.k.a., protobuf) are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data.简单说就是跨语言跨平台的可拓展的结构数据序列化用的。翻译着有点别扭,仍是直接看英文好理解。。。行,如今你们知道这个是用来作数据序列化的了,你们是否记得Golang自带的一个数据结构序列化编码/解码工具gob?以前咱们有专门介绍过:《golang - gob与rpc》。缓存
ok,看过gob这篇文章,你们就知道protobuf须要解决的基本问题了,下面咱们结合源码来看protobuf的知识点。安全
$GOPATH\src\github.com\golang\groupcache\groupcachepb\groupcache.proto内容以下:数据结构
1syntax = "proto2";
2
3package groupcachepb;
4
5message GetRequest {
6 required string group = 1;
7 required string key = 2; // not actually required/guaranteed to be UTF-8
8}
9
10message GetResponse {
11 optional bytes value = 1;
12 optional double minute_qps = 2;
13}
14
15service GroupCache {
16 rpc Get(GetRequest) returns (GetResponse) {
17 };
18}
能够看到这是某种语法的数据定义格式,咱们先介绍一下这里涉及的概念:并发
protobuf中主要数据类型有:app
-
标准数据类型:整型,浮点,字符串等
-
复合数据类型:枚举和message类型
看message部分:
message GetResponse {
optional bytes value = 1;
optional double minute_qps = 2;
}
-
每一个字段末尾有一个tag,这个tag要求不重复,如这里的一、2;
-
每一个字段有一个类型,如这里的bytes、double;
-
每一个字段开头的optional含义为:
-
required: 必须赋值,不能为空
-
optional:能够赋值,也能够不赋值
-
repeated: 该字段能够重复任意次数,包括0次
如今咱们能够看懂这个message的名字是GetResponse,有2个可选字段value和minute_qps,两个字段的类型分别为bytes和double,2个字段都是optional的。
protobuf也提供了包的定义,只要在文件开头定义package关键字便可,因此这里的package groupcachepb;这行也好理解;第一行syntax = "proto2";明显是声明版本的,除了proto2外还有proto3版本,相似与py2后有了py3。
到这里就剩下最后几行有点疑惑了:
service GroupCache {
rpc Get(GetRequest) returns (GetResponse) {
};
}
这里能够看到打头的是service,中间的字段是一个rpc相关的相似函数的东西,参数和返回值都是上面定义的message:GetRequest和GetResponse,明显这里和rpc要有关系了,细节咱们先不讲,到后面调用到的地方咱再结合业务代码来理解这里的细节。
2、LRU
查一下百度百科,能够获得LRU的解释以下:
内存管理的一种页面置换算法,对于在内存中但又不用的数据块(内存块)叫作LRU,操做系统会根据哪些数据属于LRU而将其移出内存而腾出空间来加载另外的数据。
什么是LRU算法? LRU是Least Recently Used的缩写,即最近最少使用,经常使用于页面置换算法,是为虚拟页式存储管理服务的。
因此这里的lru包也就是用来实现lru算法的,详细的解释我放在注释中:$GOPATH\src\github.com\golang\groupcache\lru\lru.go:
1// Package lru implements an LRU cache.
2//【lru包用于实现LRU cache】
3package lru
4
5import "container/list"
6
7// Cache is an LRU cache. It is not safe for concurrent access.
8//【Cache结构用于实现LRU cache算法;并发访问不安全】
9type Cache struct {
10 // MaxEntries is the maximum number of cache entries before
11 // an item is evicted. Zero means no limit.
12 //【最大入口数,也就是缓存中最多存几条数据,超过了就触发数据淘汰;0表示没有限制】
13 MaxEntries int
14
15 // OnEvicted optionally specificies a callback function to be
16 // executed when an entry is purged from the cache.
17 //【销毁前回调】
18 OnEvicted func(key Key, value interface{})
19
20 //【链表】
21 ll *list.List
22 //【key为任意类型,值为指向链表一个结点的指针】
23 cache map[interface{}]*list.Element
24}
25
26// A Key may be any value that is comparable.
27// See http://golang.org/ref/spec#Comparison_operators
28//【任意可比较类型】
29type Key interface{}
30
31//【访问入口结构,包装键值】
32type entry struct {
33 key Key
34 value interface{}
35}
36
37// New creates a new Cache.
38// If maxEntries is zero, the cache has no limit and it's assumed
39// that eviction is done by the caller.
40//【初始化一个Cache类型实例】
41func New(maxEntries int) *Cache {
42 return &Cache{
43 MaxEntries: maxEntries,
44 ll: list.New(),
45 cache: make(map[interface{}]*list.Element),
46 }
47}
48
49// Add adds a value to the cache.
50//【往缓存中增长一个值】
51func (c *Cache) Add(key Key, value interface{}) {
52 //【若是Cache尚未初始化,先初始化,建立cache和l1】
53 if c.cache == nil {
54 c.cache = make(map[interface{}]*list.Element)
55 c.ll = list.New()
56 }
57 //【若是key已经存在,则将记录前移到头部,而后设置value】
58 if ee, ok := c.cache[key]; ok {
59 c.ll.MoveToFront(ee)
60 ee.Value.(*entry).value = value
61 return
62 }
63 //【key不存在时,建立一条记录,插入链表头部,ele是这个Element的指针】
64 //【这里的Element是一个*entry类型,ele是*list.Element类型】
65 ele := c.ll.PushFront(&entry{key, value})
66 //cache这个map设置key为Key类型的key,value为*list.Element类型的ele
67 c.cache[key] = ele
68 //【链表长度超过最大入口值,触发清理操做】
69 if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
70 c.RemoveOldest()
71 }
72}
73
74// Get looks up a key's value from the cache.
75//【根据key查找value】
76func (c *Cache) Get(key Key) (value interface{}, ok bool) {
77 if c.cache == nil {
78 return
79 }
80 //【若是存在】
81 if ele, hit := c.cache[key]; hit {
82 //【将这个Element移动到链表头部】
83 c.ll.MoveToFront(ele)
84 //【返回entry的值】
85 return ele.Value.(*entry).value, true
86 }
87 return
88}
89
90// Remove removes the provided key from the cache.
91//【若是key存在,调用removeElement删除链表and缓存中的元素】
92func (c *Cache) Remove(key Key) {
93 if c.cache == nil {
94 return
95 }
96 if ele, hit := c.cache[key]; hit {
97 c.removeElement(ele)
98 }
99}
100
101// RemoveOldest removes the oldest item from the cache.
102//【删除最旧的元素】
103func (c *Cache) RemoveOldest() {
104 if c.cache == nil {
105 return
106 }
107 //【ele为*list.Element类型,指向链表的尾结点】
108 ele := c.ll.Back()
109 if ele != nil {
110 c.removeElement(ele)
111 }
112}
113
114func (c *Cache) removeElement(e *list.Element) {
115 //【链表中删除一个element】
116 c.ll.Remove(e)
117 //【e.Value本质是*entry类型,entry结构体就包含了key和value2个属性】
118 //【Value自己是interface{}类型,经过类型断言转成*entry类型】
119 kv := e.Value.(*entry)
120 //【删除cache这个map中key为kv.key这个元素;也就是链表中删了以后缓存中也得删】
121 delete(c.cache, kv.key)
122 if c.OnEvicted != nil {
123 c.OnEvicted(kv.key, kv.value)
124 }
125}
126
127// Len returns the number of items in the cache.
128//【返回缓存中的item数,经过链表的Len()方法获取】
129func (c *Cache) Len() int {
130 if c.cache == nil {
131 return 0
132 }
133 return c.ll.Len()
134}
135
136// Clear purges all stored items from the cache.
137//【删除缓存中全部条目,若是有回调函数OnEvicted(),则先调用全部回调函数,而后置空】
138func (c *Cache) Clear() {
139 if c.OnEvicted != nil {
140 for _, e := range c.cache {
141 kv := e.Value.(*entry)
142 c.OnEvicted(kv.key, kv.value)
143 }
144 }
145 c.ll = nil
146 c.cache = nil
147}
3、singleflight
这个package主要实现了这样一个功能:抑制同一个函数调用重复执行。举个例子:给一个常规程序输入一个函数调用A()须要10s返回结果,这时候有10个客户端都调用了这个A(),可能就须要100s才能完成全部的计算结果,可是这个计算是重复的,结果也是同样的。因此能够想个办法,判断是同一个计算过程的状况,不须要重复执行,直接等待上一次计算完成,而后一会儿返回结果就好了。下面看一下groupcache中是如何实现这个算法的吧:
1// Package singleflight provides a duplicate function call suppression
2// mechanism.
3//【“单航班”提供重复调用函数的抑制机制】
4package singleflight
5
6import "sync"
7
8// call is an in-flight or completed Do call
9//【在执行的或者已经完成的Do过程】
10type call struct {
11 wg sync.WaitGroup
12 val interface{}
13 err error
14}
15
16// Group represents a class of work and forms a namespace in which
17// units of work can be executed with duplicate suppression.
18//【表示一类工做,组成一个命名空间的概念,一个group的调用会有“重复抑制”】
19type Group struct {
20 mu sync.Mutex // protects m
21 //【懒惰地初始化;这个map的value是*call,call是上面那个struct】
22 m map[string]*call // lazily initialized
23}
24
25// Do executes and returns the results of the given function, making
26// sure that only one execution is in-flight for a given key at a
27// time. If a duplicate comes in, the duplicate caller waits for the
28// original to complete and receives the same results.
29
30//【Do接收一个函数,执行并返回结果,
31// 这个过程当中确保同一个key在同一时间只有一个执行过程;
32// 重复的调用会等待最原始的调用过程完成,而后接收到相同的结果】
33func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
34 g.mu.Lock()
35 if g.m == nil {
36 g.m = make(map[string]*call)
37 }
38 //【若是这个call存在同名过程,等待初始调用完成,而后返回val和err】
39 if c, ok := g.m[key]; ok {
40 g.mu.Unlock()
41 c.wg.Wait()
42 //【当全部goroutine执行完毕,call中就存储了执行结果val和err,而后这里返回】
43 return c.val, c.err
44 }
45 //【拿到call结构体类型的指针】
46 c := new(call)
47 //【一个goroutine开始,Add(1),这里最多只会执行到一次,也就是不会并发调用下面的fn()】
48 c.wg.Add(1)
49 //【相似设置一个函数调用的名字“key”对应调用过程c】
50 g.m[key] = c
51 g.mu.Unlock()
52
53 //【函数调用过程】
54 c.val, c.err = fn()
55 //【这里的Done对应上面if里面的Wait】
56 c.wg.Done()
57
58 g.mu.Lock()
59 //【执行完成,删除这个key】
60 delete(g.m, key)
61 g.mu.Unlock()
62
63 return c.val, c.err
64}
今天讲的可能有点多,其中设计到的List之类的没有细讲,但愿你们经过互联网掌握这类我没有仔细提到的小知识点,完全吃透这几个package中的源码。
回过头看一下项目结果,除了testpb包外其余包咱们都讲完了,testpb是groupcachepb对应的测试程序,下一讲咱们就能够把这几个包外的全部程序分析完,包括对protobuf部分的调用逻辑。
今天就到这里,groupcache源码解析还剩最后一讲!