ETCD数据监听
ETCD
更新/删除/重新设置键的ttl 都会触发watcher , 但是如果在body中增加refresh=true , 更新ttl(必须存在) , 将不会出发watcher事件。
?wait=true 监听当前节点
?recursive=true 递归监听当前节点和子目录
?waitIndex=xxx 监听过去已经发生的。过去值的查询或监听, 必选与wait一起使用。
watch 一个ttl自删除的key时,收到如下 “expire” action。
{
"action": "expire",
"node": {
"createdIndex": 2223,
"key": "/message",
"modifiedIndex": 2224
},
"prevNode": {
"createdIndex": 2223,
"expiration": "2018-11-12T09:25:00.028597482Z",
"key": "/message",
"modifiedIndex": 2223,
"value": ""
}
}
GET 对过去的键值操作进行查询:类似上面提到的监控,在其基础上指定过去某次修改的索引编号,就可以查询历史操作。默认可查询的历史记录为1000条。
?waitIndex=xxx 监听过去已经发生的。 这个在确保在watch命令中,没有丢失事件非常有用。例如:我们反复watch 我们得到节点的 modifiedIndex+1。
因为 node的modifiedIndex的值是不连续,如果waitIndex的值没有相应modifiedIndex,返回比它大的最近的modifedIndex的节点信息。 如果大于节点中所有的modifiedIndex,等待,直到节点的modifiedIndex值大于等于waitIndex的值。
即使删除key后,也可以查询历史数据。
store中有一个全局的currentIndex,每次变更,index会加1.然后每个event都会关联到currentIndex.
当客户端调用watch接口(参数中增加 wait参数)时,如果监听的waitIndex 不存在与key对应的EventHistroy 中(currentIndex >= waitIndex),并且key对应的modifyIndex > waitIndex , 则会查找到第一个大于waitIndex 的数据进行展示。
如果历史表中没有或者请求没有带 waitIndex,则放入WatchHub中,每个key会关联一个watcher列表。 当有变更操作时,变更生成的event会放入EventHistroy表中,同时通知和该key相关的watcher。
注意:
1. 必须与 wait 一起使用;
2. curl 中url需要使用引号。
3. etcd 仅仅保留系统中所有key最近的1000条event,建议将获取到的response发送到另一个线程处理,而不是处理response而阻塞watch。
4. 如果watch超出了etcd保存的最近1000条,建议get后使用response header中的 X-Etcd-Index + 1进行重新watch,而不是使用node中的modifiedIndex+1. 因为 X-Etcd-Index 永远大于等于modifiedIndex, 使用modifiedIndex可能会返回401错误码,同样超出。
5. long polling可能会被服务器关闭,如超时或服务器关闭。导致仅仅收到header 200OK,body为空,此时应重新watch。
etcd4j 测试案例:
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.util.List;
/**
* @Description
* @auther bozhu
* @create 11\ 12\ 2018
*/
public class EtcdClientAddListenerTest {
EtcdClient client = null;
@Before
public void executeBefore() {
client = new EtcdClient(URI.create("http://172.16.85.20:2379"));
}
private String getConfig(String configFile , EtcdKeysResponse dataTree) {
List<EtcdKeysResponse.EtcdNode> nodes = null;
if(null != dataTree ) {
return dataTree.getNode().getValue();
}
System.out.println("Etcd configFile"+ configFile+"is not exist,Please Check");
return null;
}
@Test
public void testStask() throws Exception {
for (int i = 1; i < 5000; i++) {
EtcdKeysResponse etcdKeysResponse = client.put("/xdriver/test/value", "" + i).send().get();
System.out.println(etcdKeysResponse);
}
}
@Test
public void testListener() throws Exception{
this.startListenerThread(client , "/xdriver/test/value");
Thread.sleep(1000000L);
}
public void startListenerThread(EtcdClient client , String dir) throws Exception{
EtcdKeysResponse etcdKeysResponse = client.get(dir).send().get();
System.out.println(etcdKeysResponse.getNode().getValue());
new Thread(()->{
startListener(client,dir,etcdKeysResponse.getNode().getModifiedIndex() + 1);
}).start();
}
public void startListener(final EtcdClient client , final String dir , long waitIndex) {
EtcdResponsePromise<EtcdKeysResponse> promise = null;
try {
// 如果监听的waitIndex 不存在与key对应的EventHistroy 中(currentIndex >= waitIndex) ,
// 并且key对应的modifyIndex > waitIndex , 则会查找到第一个大于waitIndex 的数据进行展示
promise = client.get(dir).waitForChange(waitIndex).consistent().send();
} catch (IOException e) {
e.printStackTrace();
}
promise.addListener(promisea -> {
try {
EtcdKeysResponse etcdKeysResponse = promisea.get();
new Thread(() -> {startListener(client , dir , etcdKeysResponse.getNode().getModifiedIndex() + 1);}).start();
String config = getConfig(dir, etcdKeysResponse);//加载配置项
System.out.println(config);
} catch (Exception e) {
e.printStackTrace();
System.out.println("listen etcd 's config change cause exception:{}"+e.getMessage());
}
});
}
}
转载于:https://my.oschina.net/LucasZhu/blog/2875221