欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ETCD 源码学习--MVCC KV存储实现(二)

程序员文章站 2022-06-14 18:57:05
...

在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。

ETCD 的键值存储主要通过 bbolt(键值存储数据库)进行存储,bblot 保证了读事务并发性和写事务的串行性,这里不深究 bbolt 的具体实现。本文只说明 ETCD 如何通过 bbolt 和 Index 实现键值的多版本控制存储。

key

由 revison 组成, 格式为分为 main_sub(t), 如果带 t, 表示元素被删除。

/mvcc/revision.go

//将版本号转换为 bbolt 的键
func revToBytes(rev revision, bytes []byte) {
	binary.BigEndian.PutUint64(bytes, uint64(rev.main))
	bytes[8] = '_'
	binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

value

将key value 封装在KeyValue

mvccpb.KeyValue{
		Key:            key, //原始 key
		Value:          value, //原始 value
		CreateRevision: 	c, //创建时的版本号
		ModRevision:    rev, //最新版本号
		Version:        ver,
		Lease:          int64(leaseID),
}

增/改

/mvcc/kvstore_txn.go

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
	rev := tw.beginRev + 1 //主版本号+1
	c := rev
...
	ibytes := newRevBytes()
	idxRev := revision{main: rev, sub: int64(len(tw.changes))} //主版本号 rev, 副版本为当前事务的修改次数
	revToBytes(idxRev, ibytes) // 生成 bbolt的 key
	ver = ver + 1
	kv := mvccpb.KeyValue{ //生成 bblot 的 Value
		Key:            key,
		Value:          value,
		CreateRevision: c,
		ModRevision:    rev,
		Version:        ver,
		Lease:          int64(leaseID),
	}
...
	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) //写入 bbolt
	tw.s.kvindex.Put(key, idxRev)       //写入索引
...
}

/mvcc/kvstore_txn.go

func (tw *storeTxnWrite) delete(key []byte) {
	ibytes := newRevBytes()
	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
	revToBytes(idxRev, ibytes) //生成 key
	
    //key 追加一个 't' 的标识 Tombstone,表示删除
	ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
	kv := mvccpb.KeyValue{Key: key} //生成 Value
	d, err := kv.Marshal()
	...
	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) //删除 bbolt
	err = tw.s.kvindex.Tombstone(key, idxRev) //删除 索引
....
}

/mvcc/kvstore_txn.go

func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
	rev := ro.Rev //指定的需要读取的版本号

	//版本号对比,读取的版本号不能超过当前事务 ID
	if rev > curRev {
		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
	}
	if rev <= 0 { //按当前版本号读取数据
		rev = curRev
	}
	if rev < tr.s.compactMainRev { //最近压缩之后的最小版本号
		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
	}
	//获取从索引中读取键为 key->end 的 版本号。
	revpairs := tr.s.kvindex.Revisions(key, end, rev)
....
	kvs := make([]mvccpb.KeyValue, limit)
	revBytes := newRevBytes()
	for i, revpair := range revpairs[:len(kvs)] { //遍历查询到的 revision 信息
		revToBytes(revpair, revBytes) //生成 key
		_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) //从bbolt 获取指定 key 的值 
		....
        //反序列化为 Value 值
		if err := kvs[i].Unmarshal(vs[0]); err != nil {
			...
		}
	}
	...
}

func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
	if end == nil {
		rev, _, _, err := ti.Get(key, atRev)
		if err != nil {
			return nil
		}
		return []revision{rev}
	}
	ti.visit(key, end, func(ki *keyIndex) {
		if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
			revs = append(revs, rev)
		}
	})
	return revs
}

总结

这里主要介绍了 MVCC 上层的接口,这些接口通过调用索引的方法 和 bblot 的方法实现多版本并发控制的存储和查找。

相关标签: etcd mvcc etcd