带你入门etcd
etcd是什么?
etcd是一个用go语言写的,用于分布式系统下高性能的键值(k-v)存储、服务发现、负载均衡、、分布式锁、配置管理等场景地应用,类似于java的zookeeper。基于raft协议,能保证数据的一致性。
官方地址
[etcd.io]
[github.com]
etcd的安装
有两种方式安装,可以通过编译好的二进制或源码安装,我这里用的是二进制安装的,另一个源码安装的我没有试过。
另外,这里的安装也不是集群方式安装的,用的单结点的,只是一些基本的操作。
一、二进制安装
- 点击下面的地址,根椐自已的系统,选择不同的压缩包下载,如 macos下载etcd-v3.4.4-darwin-amd64.zip
[]
- 将下载的压缩包,解压出来,见如下内容
$ ls ~/applications/etcd documentation readme-etcdctl.md readme.md readmev2-etcdctl.md etcd etcdctl
- 上面的文件中,etcd是服务端; etcdctl是客户端,里面集成了常用一些api。
二、源码安装
因为是go写的,可直接使用 go get https://github.com/etcd-io/etcd
下载就行了,不过这个etcd包有点大,如果网络不好,还不一定能下载下来,凡正我是没整下来。
所以,如果下载不下来,可以尝试用github的zip打包下载试试,如果也还是不行,那么可用国内的下载加速。
我在网上找到一个 [https://www.newbe.pro/mirrors/mirrors-etcd/] https://www.newbe.pro/mirrors/mirrors-etcd/,从这里面直接把压缩包下载下来,放到你的gopath下就行了。
接下来进入到etcd的目录下面,执行 ./build
构建就行了。
etcd启动
不管用那种方式,要启动etcd,先找到etcd这个文件,然后执行它.默认在你本机的2379端口,如果你要在公网开启,指定下参数就行了.
$ ~/applications/etcd/etcd --advertise-client-urls="https://123.33.44.34:2379" --listen-client-urls="https://123.33.44.34:2379"
本地如下方式启动
$ ~/applications/etcd/etcd [warning] deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead 2020-03-11 12:55:38.289362 i | etcdmain: etcd version: 3.4.4 2020-03-11 12:55:38.289461 i | etcdmain: git sha: c65a9e2dd 2020-03-11 12:55:38.289468 i | etcdmain: go version: go1.12.12 2020-03-11 12:55:38.289476 i | etcdmain: go os/arch: darwin/amd64 2020-03-11 12:55:38.289482 i | etcdmain: setting maximum number of cpus to 4, total number of available cpus is 4 2020-03-11 12:55:38.289492 n | etcdmain: failed to detect default host (default host not supported on darwin_amd64) 2020-03-11 12:55:38.289500 w | etcdmain: no data-dir provided, using default data-dir ./default.etcd [warning] deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead 2020-03-11 12:55:38.292027 i | embed: name = default 2020-03-11 12:55:38.292038 i | embed: data dir = default.etcd 2020-03-11 12:55:38.292045 i | embed: member dir = default.etcd/member 2020-03-11 12:55:38.292050 i | embed: heartbeat = 100ms 2020-03-11 12:55:38.292054 i | embed: election = 1000ms 2020-03-11 12:55:38.292058 i | embed: snapshot count = 100000 2020-03-11 12:55:38.292079 i | embed: advertise client urls = http://localhost:2379 2020-03-11 12:55:38.407474 i | etcdserver: starting member 8e9e05c52164694d in cluster cdf818194e3a8c32
好了,看到上面的输出,服务端就启动好了,如果要测试,可以用etcdctl测试下。
etcdctl put "task/task1" "task任务" ok etcdctl get "task/task1" task/task1 task任务
代码操作
这里用go对etcd进行一些简单的操作,go的客户端可以在官网上去找[]
我这里用的是clientv3这个包, 这个包在上面源码安装时,下载的包里面就有,那个包是服务端和客户端代码都有的。
get操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.config client *clientv3.client kv clientv3.kv resp *clientv3.getresponse err error ) config = clientv3.config{ endpoints:[]string{"localhost:2379"}, dialtimeout:5 * time.second, } if client,err = clientv3.new(config); err != nil { fmt.println(err) return } kv = clientv3.newkv(client) if resp, err = kv.get(context.todo(),"cron/jobs/",clientv3.withprefix()); err != nil { fmt.println(err) return } for k,v := range resp.kvs { fmt.println(k,v,string(v.key), string(v.value)) } }
续约操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.config client *clientv3.client lease clientv3.lease leasegrantresp *clientv3.leasegrantresponse leaseid clientv3.leaseid leasekeepresp *clientv3.leasekeepaliveresponse leasekeeprespchan <-chan *clientv3.leasekeepaliveresponse getresp *clientv3.getresponse putresp *clientv3.putresponse kv clientv3.kv err error ) config = clientv3.config{ endpoints:[]string{"localhost:2379"}, dialtimeout:5 * time.second, } if client,err = clientv3.new(config); err != nil { fmt.println(err) return } lease = clientv3.newlease(client) if leasegrantresp, err = lease.grant(context.todo(), 10); err != nil { fmt.println(err) } leaseid = leasegrantresp.id ctx, _ := context.withtimeout(context.todo(), 10 * time.second) //10秒停止续约 if leasekeeprespchan, err = lease.keepalive(ctx,leaseid); err != nil { fmt.println(err) } //启动协程,进行续约 go func(){ for { select { case leasekeepresp = <-leasekeeprespchan: if leasekeepresp == nil{ fmt.println("租约失效了") goto end } else { fmt.println("收到续约", leasekeepresp) } } } end: }() kv = clientv3.newkv(client) if putresp ,err = kv.put(context.todo(),"cron/jobs/job3","job3",clientv3.withlease(leaseid)); err != nil { fmt.println(err) } else { fmt.println("写入成功", putresp.header.revision) } for { if getresp, err = kv.get(context.todo(),"cron/jobs/job3"); err != nil { fmt.println(err) } if getresp.count != 0 { fmt.println(getresp.kvs) } else { fmt.println(" 过期了") break } time.sleep(2 * time.second) } }
op操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.config client *clientv3.client opput clientv3.op opget clientv3.op opresp clientv3.opresponse kv clientv3.kv err error ) config = clientv3.config{ endpoints:[]string{"localhost:2379"}, dialtimeout:5 * time.second, } if client,err = clientv3.new(config); err != nil { fmt.println(err) return } kv = clientv3.newkv(client) opput = clientv3.opput("cron/jobs/job3","job3333") if opresp, err = kv.do(context.todo(),opput); err != nil { fmt.println(err) return } fmt.println("获取revision", opresp.put().header.revision) opget = clientv3.opget("cron/jobs/job3") if opresp ,err = kv.do(context.todo(),opget); err != nil { fmt.println(err) return } if len(opresp.get().kvs) != 0 { fmt.println("获取值:", string(opresp.get().kvs[0].value)) } }
简单分布式锁
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) //简单分布式锁 func main(){ var ( config clientv3.config client *clientv3.client /*opput clientv3.op opget clientv3.op opresp clientv3.opresponse*/ kv clientv3.kv txn clientv3.txn txnresp *clientv3.txnresponse lease clientv3.lease leasegrantresp *clientv3.leasegrantresponse leaseid clientv3.leaseid leasekeepresp *clientv3.leasekeepaliveresponse leasekeeprespchan <-chan *clientv3.leasekeepaliveresponse ctx context.context cancelfunc context.cancelfunc err error ) config = clientv3.config{ endpoints:[]string{"localhost:2379"}, dialtimeout:5 * time.second, } if client,err = clientv3.new(config); err != nil { fmt.println(err) return } //租约操作 lease = clientv3.newlease(client) if leasegrantresp, err = lease.grant(context.todo(), 5); err != nil { fmt.println(err) } leaseid = leasegrantresp.id ctx, cancelfunc = context.withcancel(context.todo()) defer cancelfunc() defer lease.revoke(context.todo(),leaseid) if leasekeeprespchan, err = lease.keepalive(ctx,leaseid); err != nil { fmt.println(err) } //启动协程,进行续约 go func(){ for { select { case leasekeepresp = <-leasekeeprespchan: if leasekeepresp == nil{ fmt.println("租约失效了") goto end } else { fmt.println("收到续约", leasekeepresp) } } } end: }() kv = clientv3.newkv(client) //开启事务 txn = kv.txn(context.todo()) txn.if(clientv3.compare(clientv3.createrevision("cron/lock/job3"),"=", 0)). then(clientv3.opput("cron/lock/job3","123",clientv3.withlease(leaseid))). else(clientv3.opget("cron/lock/job3")) if txnresp,err = txn.commit(); err != nil { fmt.println(err) return } if !txnresp.succeeded { fmt.println("ooh,锁被占用了:", string(txnresp.responses[0].getresponserange().kvs[0].value)) return } else { fmt.println("抢到了") } fmt.println("处理业务") time.sleep( 10 * time.second) }
以上就是记录我在学习etcd的基本入门和一些常用的操作,希望大家能喜欢,期待和大家的一些进步。