欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ETCD数据监听

程序员文章站 2022-07-13 22:42:23
...

ETCD

更新/删除/重新设置键的ttl 都会触发watcher , 但是如果在body中增加refresh=true , 更新ttl(必须存在) , 将不会出发watcher事件。

?wait=true        监听当前节点
?recursive=true    递归监听当前节点和子目录
?waitIndex=xxx   监听过去已经发生的。过去值的查询或监听, 必选与wait一起使用。

watch 一个ttl自删除的key时,收到如下 “expire” action。

{
     "action": "expire",
     "node": {
         "createdIndex": 2223,
         "key": "/message",
         "modifiedIndex": 2224
     },
     "prevNode": {
         "createdIndex": 2223,
         "expiration": "2018-11-12T09:25:00.028597482Z",
         "key": "/message",
         "modifiedIndex": 2223,
         "value": ""
     }
 }

GET 对过去的键值操作进行查询:类似上面提到的监控,在其基础上指定过去某次修改的索引编号,就可以查询历史操作。默认可查询的历史记录为1000条。
?waitIndex=xxx   监听过去已经发生的。 这个在确保在watch命令中,没有丢失事件非常有用。例如:我们反复watch 我们得到节点的 modifiedIndex+1。

因为 node的modifiedIndex的值是不连续,如果waitIndex的值没有相应modifiedIndex,返回比它大的最近的modifedIndex的节点信息。 如果大于节点中所有的modifiedIndex,等待,直到节点的modifiedIndex值大于等于waitIndex的值。

即使删除key后,也可以查询历史数据。

store中有一个全局的currentIndex,每次变更,index会加1.然后每个event都会关联到currentIndex.

当客户端调用watch接口(参数中增加 wait参数)时,如果监听的waitIndex 不存在与key对应的EventHistroy 中(currentIndex >= waitIndex),并且key对应的modifyIndex > waitIndex , 则会查找到第一个大于waitIndex 的数据进行展示。
如果历史表中没有或者请求没有带 waitIndex,则放入WatchHub中,每个key会关联一个watcher列表。 当有变更操作时,变更生成的event会放入EventHistroy表中,同时通知和该key相关的watcher。

注意:
1. 必须与 wait 一起使用;
2. curl 中url需要使用引号。
3. etcd 仅仅保留系统中所有key最近的1000条event,建议将获取到的response发送到另一个线程处理,而不是处理response而阻塞watch。
4. 如果watch超出了etcd保存的最近1000条,建议get后使用response header中的 X-Etcd-Index + 1进行重新watch,而不是使用node中的modifiedIndex+1. 因为  X-Etcd-Index  永远大于等于modifiedIndex, 使用modifiedIndex可能会返回401错误码,同样超出。
5. long polling可能会被服务器关闭,如超时或服务器关闭。导致仅仅收到header 200OK,body为空,此时应重新watch。

etcd4j 测试案例:

import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.util.List;

/**
 * @Description
 * @auther bozhu
 * @create 11\ 12\ 2018
 */
public class EtcdClientAddListenerTest {
    EtcdClient client = null;

    @Before
    public void executeBefore() {
        client = new EtcdClient(URI.create("http://172.16.85.20:2379"));
    }

    private String getConfig(String configFile , EtcdKeysResponse dataTree) {
        List<EtcdKeysResponse.EtcdNode> nodes = null;
        if(null != dataTree ) {
            return dataTree.getNode().getValue();
        }
        System.out.println("Etcd configFile"+ configFile+"is not exist,Please Check");
        return null;
    }
    @Test
    public void testStask() throws Exception {
        for (int i = 1; i < 5000; i++) {
            EtcdKeysResponse etcdKeysResponse = client.put("/xdriver/test/value", "" + i).send().get();
            System.out.println(etcdKeysResponse);
        }
    }
    @Test
    public void testListener() throws Exception{
        this.startListenerThread(client , "/xdriver/test/value");
        Thread.sleep(1000000L);
    }
    public void startListenerThread(EtcdClient client , String dir) throws Exception{
        EtcdKeysResponse etcdKeysResponse = client.get(dir).send().get();
        System.out.println(etcdKeysResponse.getNode().getValue());
        new Thread(()->{
            startListener(client,dir,etcdKeysResponse.getNode().getModifiedIndex() + 1);
        }).start();
    }
    public void startListener(final EtcdClient client , final String dir , long waitIndex)  {
        EtcdResponsePromise<EtcdKeysResponse> promise = null;
        try {
            // 如果监听的waitIndex 不存在与key对应的EventHistroy 中(currentIndex >= waitIndex) ,
            // 并且key对应的modifyIndex > waitIndex , 则会查找到第一个大于waitIndex 的数据进行展示
            promise = client.get(dir).waitForChange(waitIndex).consistent().send();
        } catch (IOException e) {
            e.printStackTrace();
        }
        promise.addListener(promisea -> {
            try {
                EtcdKeysResponse etcdKeysResponse = promisea.get();
                new Thread(() -> {startListener(client , dir , etcdKeysResponse.getNode().getModifiedIndex() + 1);}).start();
                String config = getConfig(dir, etcdKeysResponse);//加载配置项
                System.out.println(config);
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("listen etcd 's config change cause exception:{}"+e.getMessage());
            }
        });
    }
}

 

 

 

转载于:https://my.oschina.net/LucasZhu/blog/2875221