欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

闲谈etcd(三)etcd的使用

程序员文章站 2022-07-13 22:20:40
...

连接

client

type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance

    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
}
  • Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即操作K-V。
  • Lease:租约相关操作,比如申请一个TTL=10秒的租约。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理etcd的用户和权限,属于管理员操作。
  • Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。

实例化client

import (
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

//连接
func main() {
	//客户端配置
	config := clientv3.Config{
		Endpoints:   []string{"192.168.1.109:2379"},
		DialTimeout: 5 * time.Second,
	}

	//建立连接
	if client, err := clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}
 
   fmt.Println("connect success")
	defer client.Close()
}
  • Endpoints:etcd的多个节点服务地址。
  • DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err。一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。

k-v存取

put

   //控制超时
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   //增-存值
	_, err = client.Put(ctx, "/demo/demo1_key", "demo1_value")
	//操作完毕,cancel掉
	cancel()
	if err != nil {
		fmt.Println("put failed, err:", err)
		return
	}
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
  • ctx: Context包对象,是用来跟踪上下文的,列如超时控制
  • key: 存储对象的key
  • val: 存储对象的value
  • opts: 可变参数,额外选项

get

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := client.Get(ctx, "/demo/demo1_key")
   // Get查询还可以增加WithPrefix选项,获取某个目录下的所有子元素
   //eg: resp, err := client.Get(ctx, "/demo/", clientv3.WithPrefix())
	cancel()
	if err != nil {
		fmt.Println("get failed err:", err)
		return
	}

	for _, item := range resp.Kvs { //Kvs 返回key的列表
		fmt.Printf("%s : %s \n", item.Key, item.Value)
	}

delete

ctx, _ = context.WithTimeout(context.Background(), time.Second)
   resp, err := client.Delete(ctx, "/demo/demo1_key")
   if err != nil {
       fmt.Println(err)
   }
   fmt.Println(resp.PrevKvs)

watch 监听

Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main() {
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.1.109:2379"},
		DialTimeout: time.Second,
	})
	if err != nil {
		fmt.Println("connect failed err : ", err)
		return
	}
	defer client.Close()

	client.Put(context.Background(), "/demo/demo2_key", "demo2_value")
	go func() {
		//watch
		watchKey := client.Watch(context.Background(), "/demo/demo2_key")
		for resp := range watchKey {
			for _, item := range resp {
				fmt.Printf("%s %q : %q \n", item.Type, item.Kv.key, item.Kv.Value)
			}
		}
	}
    
    if resp, err := client.Put(context.TODO(), "/demo/demo2_key/", "demo2_watch"); err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(resp)
    }
}

Transaction 事务

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"log"
	"sync"
	"time"
)

func main() {
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   5 * time.Second,
		DialTimeout: 3 * time.Second,
	})
	if err != nil {
		fmt.Println("connect failed err: ", err)
		return
	}
	client.Close()

	var w sync.WaitGroup
	w.Add(10)
	key10 := "setnx"
	for i := 0; i < 10; i++ {
		go func(i int) {
			time.Sleep(5 * time.Millisecond)
			//通过key的Create_Revision 是否为 0 来判断key是否存在。其中If,Then 以及 Else 分支都可以包含多个操作。
			//返回的数据包含一个successed字段,当为 true 时代表 If 为真
			_, err := client.Txn(context.Background()).
				If(clientv3.Compare(clientv3.CreateRevision(key10), "=", 0)).
				Then(clientv3.OpPut(key10, fmt.Sprintf("%d", i))).
				Commit()
			if err != nil {
				fmt.Println(err)
			}

			w.Done()
		}(i)
	}
	w.Wait()

	if resp, err := client.Get(context.TODO(), key10); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

lease 租约

  • Grant:分配一个租约。
  • Revoke:释放一个租约。
  • TimeToLive:获取剩余TTL时间。
  • Leases:列举所有etcd中的租约。
  • KeepAlive:自动定时的续约某个租约。
  • KeepAliveOnce:为某个租约续约一次。
  • Close:关闭当前客户端建立的所有租约。
import (
	"time"
)

var (
	dialTimeout    = 2 * time.Second
	requestTimeout = 10 * time.Second
)

func main() {
	ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
	client, err := clientv3.New(clientv3.Config{
		DialTimeout: dialTimeout,
		Endpoints:   []string{"localhost:2379"},
	})

	if err != nil {
		log.Fatal(err)
	}

	kv := clientv3.NewKv(client)

	lease, err := client.Grant(ctx, 3)
	if err != nil {
		log.Fatal(err)
	}

	//Insert key with a lease of 3 second TTL
	kv.Put(ctx, "/demo/demo1_key", "demo1_value", clientv3.WithLease(lease.ID))

	gr, _ = kv.Get(ctx, "/demo/demo1_key")
	if len(gr.Kvs) == 1 {
		fmt.Println("Found key")
	}

	//let the TTL expire
	time.Sleep(3 * time.Second)

	gr, _ = kv.Get(ctx, "/demo/demo1_key")
	if len(gr.Kvs) == 0 {
		fmt.Println("no more key")
	}
}

 

相关标签: etcd 经验分享