欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

带你入门etcd

程序员文章站 2022-04-16 09:31:56
etcd是什么? etcd是一个用Go语言写的,用于分布式系统下高性能的键值(K V)存储、服务发现、负载均衡、、分布式锁、配置管理等场景地应用,类似于Java的zookeeper。基于Raft协议,能保证数据的一致性。 官方地址 [etcd.io] https://etcd.io [github. ......

etcd是什么?

etcd是一个用go语言写的,用于分布式系统下高性能的键值(k-v)存储、服务发现、负载均衡、、分布式锁、配置管理等场景地应用,类似于java的zookeeper。基于raft协议,能保证数据的一致性。

官方地址

[etcd.io]

[github.com]

etcd的安装

有两种方式安装,可以通过编译好的二进制或源码安装,我这里用的是二进制安装的,另一个源码安装的我没有试过。

另外,这里的安装也不是集群方式安装的,用的单结点的,只是一些基本的操作。

一、二进制安装

  1. 点击下面的地址,根椐自已的系统,选择不同的压缩包下载,如 macos下载etcd-v3.4.4-darwin-amd64.zip

[]

  1. 将下载的压缩包,解压出来,见如下内容
 $ ls ~/applications/etcd
documentation       readme-etcdctl.md   readme.md           readmev2-etcdctl.md etcd     etcdctl
  1. 上面的文件中,etcd是服务端; etcdctl是客户端,里面集成了常用一些api。

二、源码安装

因为是go写的,可直接使用 go get https://github.com/etcd-io/etcd 下载就行了,不过这个etcd包有点大,如果网络不好,还不一定能下载下来,凡正我是没整下来。

所以,如果下载不下来,可以尝试用github的zip打包下载试试,如果也还是不行,那么可用国内的下载加速。

我在网上找到一个 [https://www.newbe.pro/mirrors/mirrors-etcd/] https://www.newbe.pro/mirrors/mirrors-etcd/,从这里面直接把压缩包下载下来,放到你的gopath下就行了

接下来进入到etcd的目录下面,执行 ./build构建就行了。

etcd启动

不管用那种方式,要启动etcd,先找到etcd这个文件,然后执行它.默认在你本机的2379端口,如果你要在公网开启,指定下参数就行了.

$ ~/applications/etcd/etcd  --advertise-client-urls="https://123.33.44.34:2379"  --listen-client-urls="https://123.33.44.34:2379"

本地如下方式启动

$ ~/applications/etcd/etcd
[warning] deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead
2020-03-11 12:55:38.289362 i | etcdmain: etcd version: 3.4.4
2020-03-11 12:55:38.289461 i | etcdmain: git sha: c65a9e2dd
2020-03-11 12:55:38.289468 i | etcdmain: go version: go1.12.12
2020-03-11 12:55:38.289476 i | etcdmain: go os/arch: darwin/amd64
2020-03-11 12:55:38.289482 i | etcdmain: setting maximum number of cpus to 4, total number of available cpus is 4
2020-03-11 12:55:38.289492 n | etcdmain: failed to detect default host (default host not supported on darwin_amd64)
2020-03-11 12:55:38.289500 w | etcdmain: no data-dir provided, using default data-dir ./default.etcd
[warning] deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead
2020-03-11 12:55:38.292027 i | embed: name = default
2020-03-11 12:55:38.292038 i | embed: data dir = default.etcd
2020-03-11 12:55:38.292045 i | embed: member dir = default.etcd/member
2020-03-11 12:55:38.292050 i | embed: heartbeat = 100ms
2020-03-11 12:55:38.292054 i | embed: election = 1000ms
2020-03-11 12:55:38.292058 i | embed: snapshot count = 100000
2020-03-11 12:55:38.292079 i | embed: advertise client urls = http://localhost:2379
2020-03-11 12:55:38.407474 i | etcdserver: starting member 8e9e05c52164694d in cluster cdf818194e3a8c32

好了,看到上面的输出,服务端就启动好了,如果要测试,可以用etcdctl测试下。

etcdctl put  "task/task1" "task任务"
ok

etcdctl get  "task/task1"
task/task1
task任务

代码操作

这里用go对etcd进行一些简单的操作,go的客户端可以在官网上去找[]

我这里用的是clientv3这个包, 这个包在上面源码安装时,下载的包里面就有,那个包是服务端和客户端代码都有的。

get操作

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main(){
    var (
        config clientv3.config
        client *clientv3.client
        kv clientv3.kv
        resp *clientv3.getresponse
        err error
    )
    config = clientv3.config{
        endpoints:[]string{"localhost:2379"},
        dialtimeout:5 * time.second,
    }

    if client,err = clientv3.new(config); err != nil {
        fmt.println(err)
        return
    }

    kv = clientv3.newkv(client)

    if resp,  err = kv.get(context.todo(),"cron/jobs/",clientv3.withprefix()); err != nil {
        fmt.println(err)
        return
    }
    for k,v := range resp.kvs {
        fmt.println(k,v,string(v.key), string(v.value))
    }
}

续约操作

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main(){
    var (
        config clientv3.config
        client *clientv3.client
        lease clientv3.lease
        leasegrantresp *clientv3.leasegrantresponse
        leaseid clientv3.leaseid
        leasekeepresp *clientv3.leasekeepaliveresponse
        leasekeeprespchan <-chan *clientv3.leasekeepaliveresponse
        getresp *clientv3.getresponse
        putresp *clientv3.putresponse

        kv clientv3.kv
        err error
    )
    config = clientv3.config{
        endpoints:[]string{"localhost:2379"},
        dialtimeout:5 * time.second,
    }

    if client,err = clientv3.new(config); err != nil {
        fmt.println(err)
        return
    }

    lease = clientv3.newlease(client)

    if leasegrantresp, err = lease.grant(context.todo(), 10); err != nil {
        fmt.println(err)
    }

    leaseid = leasegrantresp.id

    ctx, _ := context.withtimeout(context.todo(), 10 * time.second) //10秒停止续约

    if leasekeeprespchan, err = lease.keepalive(ctx,leaseid); err != nil {
        fmt.println(err)
    }

    //启动协程,进行续约
    go func(){
        for {
            select {
               case leasekeepresp  = <-leasekeeprespchan:
                if leasekeepresp == nil{
                    fmt.println("租约失效了")
                    goto end
                } else {
                    fmt.println("收到续约", leasekeepresp)
                }
            }
        }
        end:
    }()

    kv = clientv3.newkv(client)

    if putresp ,err = kv.put(context.todo(),"cron/jobs/job3","job3",clientv3.withlease(leaseid)); err != nil {
        fmt.println(err)
    } else {
        fmt.println("写入成功", putresp.header.revision)
    }

    for {
        if getresp, err = kv.get(context.todo(),"cron/jobs/job3"); err != nil {
            fmt.println(err)
        }

        if getresp.count != 0 {
            fmt.println(getresp.kvs)
        } else {
            fmt.println(" 过期了")
            break
        }

        time.sleep(2 * time.second)
    }


}

op操作

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main(){
    var (
        config clientv3.config
        client *clientv3.client
        opput clientv3.op
        opget clientv3.op
        opresp  clientv3.opresponse
        kv clientv3.kv

        err error
    )
    config = clientv3.config{
        endpoints:[]string{"localhost:2379"},
        dialtimeout:5 * time.second,
    }

    if client,err = clientv3.new(config); err != nil {
        fmt.println(err)
        return
    }

    kv = clientv3.newkv(client)

    opput = clientv3.opput("cron/jobs/job3","job3333")

    if opresp, err  = kv.do(context.todo(),opput); err != nil {
        fmt.println(err)
        return
    }

    fmt.println("获取revision", opresp.put().header.revision)


    opget = clientv3.opget("cron/jobs/job3")
    if opresp ,err = kv.do(context.todo(),opget); err != nil {
        fmt.println(err)
        return
    }

    if len(opresp.get().kvs) != 0 {
        fmt.println("获取值:", string(opresp.get().kvs[0].value))
    }

}

简单分布式锁

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

//简单分布式锁

func main(){
    var (
        config clientv3.config
        client *clientv3.client
        /*opput clientv3.op
        opget clientv3.op
        opresp  clientv3.opresponse*/
        kv clientv3.kv
        txn clientv3.txn
        txnresp *clientv3.txnresponse

        lease clientv3.lease
        leasegrantresp *clientv3.leasegrantresponse
        leaseid clientv3.leaseid
        leasekeepresp *clientv3.leasekeepaliveresponse
        leasekeeprespchan <-chan *clientv3.leasekeepaliveresponse

        ctx context.context
        cancelfunc context.cancelfunc
        err error
    )
    config = clientv3.config{
        endpoints:[]string{"localhost:2379"},
        dialtimeout:5 * time.second,
    }

    if client,err = clientv3.new(config); err != nil {
        fmt.println(err)
        return
    }

    //租约操作
    lease = clientv3.newlease(client)

    if leasegrantresp, err = lease.grant(context.todo(), 5); err != nil {
        fmt.println(err)
    }

    leaseid = leasegrantresp.id

    ctx, cancelfunc = context.withcancel(context.todo())

    defer cancelfunc()
    defer lease.revoke(context.todo(),leaseid)

    if leasekeeprespchan, err = lease.keepalive(ctx,leaseid); err != nil {
        fmt.println(err)
    }

    //启动协程,进行续约
    go func(){
        for {
            select {
            case leasekeepresp  = <-leasekeeprespchan:
                if leasekeepresp == nil{
                    fmt.println("租约失效了")
                    goto end
                } else {
                    fmt.println("收到续约", leasekeepresp)
                }
            }
        }
    end:
    }()



    kv = clientv3.newkv(client)

    //开启事务
    txn = kv.txn(context.todo())

    txn.if(clientv3.compare(clientv3.createrevision("cron/lock/job3"),"=", 0)).
        then(clientv3.opput("cron/lock/job3","123",clientv3.withlease(leaseid))).
        else(clientv3.opget("cron/lock/job3"))

    if txnresp,err  = txn.commit(); err != nil {
        fmt.println(err)
        return
    }

    if !txnresp.succeeded {
        fmt.println("ooh,锁被占用了:", string(txnresp.responses[0].getresponserange().kvs[0].value))
        return
    } else {
        fmt.println("抢到了")
    }

    fmt.println("处理业务")
    time.sleep( 10 * time.second)



}

以上就是记录我在学习etcd的基本入门和一些常用的操作,希望大家能喜欢,期待和大家的一些进步。