etcd事件监听
etcd是CoreOS开发的一个高可用的键值存储系统,主要用于共享配置和服务发现。它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性。etcd目前的最新版本是2.2.0。
和zookeeper的二进制接口不同,它提供了HTTP/JSON的rest api接口,所以对使用它的客户端来说是很友好的,几乎每种编程语言都有较成熟的http client开发包,基于这些开发包很容易编写etcd客户端。在java中已经有个比较好的客户端etcd4j(https://github.com/jurmous/etcd4j),类似zookeeper有好用的curator库。
etcd事件
etcd中和数据变化(包括目录&值key、value变化)相关的事件类型有:set, delete, update, create, compareAndSwap、compareAndDelete。在etcd http response (json格式)action属性就是事件类型。
etcd的事件watch监听接口也是使用http访问,它提供两种监听模式,一种是一次性监听,类似zookeeper的事件watch,监听到一次事件后,需要客户端重新发起监听,比较繁琐。另一种是持久监听(stream),当有事件时,会连续触发,不需要客户端重新发起监听。
和zookeeper一样也存在客户端丢失监听事件的可能,它不保证每个事件客户端都能监听到,不过我们在实际使用过程中,通常是在应用启动时,在开始监听之后或之前先查询etcd,将需要的数据全量加载到内存中,后续才是根据监听事件来增量更新内存中的数据或全量刷新数据。另外目前版本的etcd最多只保存1千条事件历史,满1千条后,如果有新的事件产生,事件历史库中最老的事件将被丟弃,这个可以理解为etcd服务端事件丢失。由于有1千条的限制,在有事件浪涌时(在单位时间内例如1秒内产生2千条事件),事件监听处理较慢的时候或未监听时也会发生客户端事件丢失。
一次性watch监听
可用curl发送http请求来进行演示,假设在本机安装有etcd,客户端访问端口为2379(旧版本监听端口为4001),监听key为/application。一次性监听命令如下:
curl "http://127.0.0.1:2379/v2/keys/application?wait=true"
查询字符串中的wait=true表示监听,还可加查询参数recursive=true表示附加监听/application的子目录和后辈目录,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true" 。
启动监听后etcd如有变化事件发生,则返回json格式的事件,结果类似如下:
HTTP/1.1 200 OK
Content-Type: application/json
X-Etcd-Cluster-Id: e88d54f6225f06ad
X-Etcd-Index: 271
X-Raft-Index: 872202
X-Raft-Term: 5
Date: Sat, 26 Sep 2015 08:43:17 GMT
Transfer-Encoding: chunked
{"action":"set","node":{"key":"/application/store","value":" {...}","modifiedIndex":271,"createdIndex":271},"prevNode":{...}}
上面的curl返回结果中显示了http头,是加了curl的-i选项。
从结果可看出action属性为set事件,modifiedIndex是该事件的index,为271。X-Etcd-Index头表示启动监听时etcd的当前index。etcd的index是单调递增的正整数,该整数取值空间是etcd中的所有key共享的。
由于是一次性监听,所以curl会退出,继续监听要重新运行curl。
和zookeeper事件监听一样,上面的http请求只能监听到命令发出之后产生的事件(更精确的说是在etcd服务器收到监听请求之后),之前发生的事件是监听不到的。另外在前一个监听得到触发开始(中间包括事件处理耗时)到启动下一次监听之间(并且下一次监听是etcd服务器收到监听请求才真正开始生效,这中间还有网络时延)肯定存在时间间隔,如果在这个时间间隔(区间)内有事件发生,客户端是监听不到这些事件的,这个也和zookeeper类似。也就是如果两个事件并行发生或在发生时间上相距很近(例如update一个key后立即delete),后一事件的发生刚好落在这个间隔内,那这个事件在采用一次性监听模式的客户端中会丢失,这些事件对客户端监听不可见。
那是不是就一定不能获取先前的事件了?这个要看情况,只要事件还在etcd的事件历史中,并且知道事件的index或起始index,还是可以取到单个事件的。在查询字符串中加waitIndex参数,其值是该事件的index或起始index。例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&waitIndex=269" ,表示查询/application目录或后辈目录中index等于或大于269的事件,如果有则返回一个最接近269的事件,否则会挂起curl,直到有满足条件的事件发生。这个其实是查询事件,已经不是正规的事件监听了。所以如果知道起始index,可以每次index加1来遍历查询事件,遍历查询时可以根据返回事件中的index来调整后续查询条件中使用的index。
如果waitIndex对应的事件已被etcd丢弃(见前面提到的etcd事件历史库),此时如果获取该事件,etcd将返回http状态码400( Bad Request),http body类似如下:
{"errorCode":401,"message":"The event in requested index is outdated and cleared","cause":"the requested history has been cleared [3305/1]","index":4304}
另外可以监听根目录(根key),例如curl "http://127.0.0.1:2379/v2/keys?wait=true&recursive=true" 。
持久监听(流式watch)
要在查询字符串中加stream=true,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&stream=true"
发出上述命令后,会和etcd服务器之间建立一个http长连接(此处的长连接不是指http keep alive,虽然也需要keep alive),其应答http头中的Transfer-Encoding为chunked,类似server push和comet中的multipart/x-mixed-replace,其http response body不结束,后续的每个事件作为http body的一部分,在该长连接中以一个或多个body chunk块的形式推送给客户端。可以看到在流式持久监听中curl即使收到事件也不会退出,它一直在等待后续将要发生的事件。一次性监听http头中也是chunked,但收到一个事件后,etcd会发送一个结束chunk(大小为0,表示http response结束),因此收到该chunk后curl会退出。
相比一次性监听,除了更简便,持久监听也比一次性监听可靠性高,不会出现上面提到的在时间间隔内监听不到事件的情况。
持久监听也是监听从命令发出之后发生的事件,先前的事件是监听不到的,不像JMS的持久订阅可以收到启动之前的jms消息。
如果在持久监听中加waitIndex参数,分两种情况:一种是waitIndex的值小于或等于启动监听时etcd的当前index(这个值在X-Etcd-Index http头中可看到),则curl接收到满足条件的事件后挂起(不退出),后续再也收不到其他事件,即最多只能收到一个事件。第二种是waitIndex的值大于启动监听时etcd的当前index,则waitIndex参数无效,持久监听的行为同没有waitIndex参数一样。
使用etcd4j进行事件监听
在使用etcd4j进行事件监听时,有个注意事项:如果在同一个jvm虚拟机中既有修改查询etcd数据的操作,也有监听etcd事件,监听事件使用的EtcdClient实例和修改&查询操作使用的EtcdClient实例应分开,不能共用同一个实例,否则目前的etcd4j(2.7.0版本)会出现问题。多个key可以共用一个EtcdClient实例来进行监听,也可每个key使用一个单独的实例来进行监听。
- 一次性监听
//创建etcd客户端实例
EtcdClient etcdClient = new EtcdClient (new URI[]{new URI("127.0.0.1:2379")});
/*setRetryPolicy设置重试策略
* recursive()监听/application目录和其后辈目录中的事件,设置查询字符串recursive=true.
* waitForChange()监听,设置查询字符串wait=true.
* -1表示永不超时,send()发送http请求进行监听
*/
EtcdResponsePromise<EtcdKeysResponse> promise = etcd.get("/application").setRetryPolicy(
new RetryNTimes(300,Integer.MAX_VALUE)).recursive().waitForChange().
timeout(-1, TimeUnit.SECONDS).send();
//增加listener
promise.addListener(new IsSimplePromiseResponseHandler<EtcdKeysResponse>()
{
@Override
public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {
//从promise中取响应,检查是否有异常
EtcdKeysResponse response = promise.getNow();
Throwable exception = promise.getException();
... 处理事件
//再次发起http请求进行监听
...
}
}
);
还可使用waitForChange(long waitIndex)来监听指定的事件。
- 持久监听
etcd4j的当前版本2.7.0不支持持久监听,可以使用
Java异步async httpclient(https://github.com/AsyncHttpClient/async-http-client)来实现持久监听,最挫最原始的持久监听实验代码如下:
AsyncHttpClient asyncHttpClient = ...;
//发送http get异步请求,不超时,且提供一个匿名的异步请求完成handler
asyncHttpClient.prepareGet("http://127.0.0.1:2379/v2/keys/application? wait=true&recursive=true&stream=true").setRequestTimeout(-1).execute(
new AsyncHandler<String>() {
//这个异常回调方法可类比为zookeeper的会话过期
@Override
public void onThrowable(Throwable t) {
// TODO Auto-generated method stub
//todo:异常,重新重试发起监听
}
//在会话期,有事件时会回调onBodyPartReceived方法
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart)
throws Exception {
// TODO Auto-generated method stub
// bodyPart对应一个http chunk块,一个事件的数据可能由多个chunk块组成
System.out.println("event="+new String(bodyPart.getBodyByteBuffer().array(),"utf-8"));
//todo:将chunk中的数据加入缓冲区,待收到完整的事件数据后将json格式
//的数据解析成java对象
...
return State.CONTINUE;
}
@Override
public State onStatusReceived(HttpResponseStatus responseStatus)
throws Exception {
// TODO Auto-generated method stub
return State.CONTINUE;
}
//一个会话期只调用一次,这个回调方法可类比为zookeeper会话建立
@Override
public State onHeadersReceived(HttpResponseHeaders headers)
throws Exception {
// TODO Auto-generated method stub
return State.CONTINUE;
}
@Override
public String onCompleted() throws Exception {
// TODO Auto-generated method stub
return "endWatch";
}
});