欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

etcd事件监听

程序员文章站 2022-07-03 22:26:04
...

    etcd是CoreOS开发的一个高可用的键值存储系统,主要用于共享配置和服务发现。它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性。etcd目前的最新版本是2.2.0。

    和zookeeper的二进制接口不同,它提供了HTTP/JSON的rest api接口,所以对使用它的客户端来说是很友好的,几乎每种编程语言都有较成熟的http client开发包,基于这些开发包很容易编写etcd客户端。在java中已经有个比较好的客户端etcd4j(https://github.com/jurmous/etcd4j),类似zookeeper有好用的curator库。

   

   etcd事件

    etcd中和数据变化(包括目录&值key、value变化)相关的事件类型有:set, delete, update, create, compareAndSwap、compareAndDelete。在etcd http response (json格式)action属性就是事件类型。

     etcd的事件watch监听接口也是使用http访问,它提供两种监听模式,一种是一次性监听,类似zookeeper的事件watch,监听到一次事件后,需要客户端重新发起监听,比较繁琐。另一种是持久监听(stream),当有事件时,会连续触发,不需要客户端重新发起监听。

     和zookeeper一样也存在客户端丢失监听事件的可能,它不保证每个事件客户端都能监听到,不过我们在实际使用过程中,通常是在应用启动时,在开始监听之后或之前先查询etcd,将需要的数据全量加载到内存中,后续才是根据监听事件来增量更新内存中的数据或全量刷新数据。另外目前版本的etcd最多只保存1千条事件历史,满1千条后,如果有新的事件产生,事件历史库中最老的事件将被丟弃,这个可以理解为etcd服务端事件丢失。由于有1千条的限制,在有事件浪涌时(在单位时间内例如1秒内产生2千条事件),事件监听处理较慢的时候或未监听时也会发生客户端事件丢失。   

 

 

    一次性watch监听

   可用curl发送http请求来进行演示,假设在本机安装有etcd,客户端访问端口为2379(旧版本监听端口为4001),监听key为/application。一次性监听命令如下:

    curl "http://127.0.0.1:2379/v2/keys/application?wait=true"

    查询字符串中的wait=true表示监听,还可加查询参数recursive=true表示附加监听/application的子目录和后辈目录,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true" 。

    启动监听后etcd如有变化事件发生,则返回json格式的事件,结果类似如下:

    HTTP/1.1 200 OK

    Content-Type: application/json

    X-Etcd-Cluster-Id: e88d54f6225f06ad

    X-Etcd-Index: 271

    X-Raft-Index: 872202

    X-Raft-Term: 5

    Date: Sat, 26 Sep 2015 08:43:17 GMT

    Transfer-Encoding: chunked

 

    {"action":"set","node":{"key":"/application/store","value":"        {...}","modifiedIndex":271,"createdIndex":271},"prevNode":{...}}

   上面的curl返回结果中显示了http头,是加了curl的-i选项。

   从结果可看出action属性为set事件,modifiedIndex是该事件的index,为271。X-Etcd-Index头表示启动监听时etcd的当前index。etcd的index是单调递增的正整数,该整数取值空间是etcd中的所有key共享的。

    由于是一次性监听,所以curl会退出,继续监听要重新运行curl。

    和zookeeper事件监听一样,上面的http请求只能监听到命令发出之后产生的事件(更精确的说是在etcd服务器收到监听请求之后),之前发生的事件是监听不到的。另外在前一个监听得到触发开始(中间包括事件处理耗时)到启动下一次监听之间(并且下一次监听是etcd服务器收到监听请求才真正开始生效,这中间还有网络延)肯定存在时间间隔,如果在这个时间间隔(区间)内有事件发生,客户端是监听不到这些事件的,这个也和zookeeper类似。也就是如果两个事件并行发生或在发生时间上相距很近(例如update一个key后立即delete),后一事件的发生刚好落在这个间隔内,那这个事件在采用一次性监听模式的客户端中会丢失,这些事件对客户端监听不可见。

    
etcd事件监听
            
    
    博客分类: 开发  
 

    那是不是就一定不能获取先前的事件了?这个要看情况,只要事件还在etcd的事件历史中,并且知道事件的index或起始index,还是可以取到单个事件的。在查询字符串中加waitIndex参数,其值是该事件的index或起始index。例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&waitIndex=269" ,表示查询/application目录或后辈目录中index等于或大于269的事件,如果有则返回一个最接近269的事件,否则会挂起curl,直到有满足条件的事件发生。这个其实是查询事件,已经不是正规的事件监听了。所以如果知道起始index,可以每次index加1来遍历查询事件,遍历查询时可以根据返回事件中的index来调整后续查询条件中使用的index。 

    如果waitIndex对应的事件已被etcd丢弃(见前面提到的etcd事件历史库),此时如果获取该事件,etcd将返回http状态码400( Bad Request),http body类似如下:

    {"errorCode":401,"message":"The event in requested index is outdated and cleared","cause":"the requested history has been cleared [3305/1]","index":4304} 

    另外可以监听根目录(根key),例如curl "http://127.0.0.1:2379/v2/keys?wait=true&recursive=true" 。

 

    持久监听(流式watch)

    要在查询字符串中加stream=true,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&stream=true"

    发出上述命令后,会和etcd服务器之间建立一个http长连接(此处的长连接不是指http keep alive,虽然也需要keep alive),其应答http头中的Transfer-Encoding为chunked,类似server push和comet中的multipart/x-mixed-replace,其http response body不结束,后续的每个事件作为http body的一部分,在该长连接中以一个或多个body chunk块的形式推送给客户端。可以看到在流式持久监听中curl即使收到事件也不会退出,它一直在等待后续将要发生的事件。一次性监听http头中也是chunked,但收到一个事件后,etcd会发送一个结束chunk(大小为0,表示http response结束),因此收到该chunk后curl会退出。

     相比一次性监听,除了更简便,持久监听也比一次性监听可靠性高,不会出现上面提到的在时间间隔内监听不到事件的情况。

      持久监听也是监听从命令发出之后发生的事件,先前的事件是监听不到的,不像JMS的持久订阅可以收到启动之前的jms消息。

      如果在持久监听中加waitIndex参数,分两种情况:一种是waitIndex的值小于或等于启动监听时etcd的当前index(这个值在X-Etcd-Index http头中可看到),则curl接收到满足条件的事件后挂起(不退出),后续再也收不到其他事件,即最多只能收到一个事件。第二种是waitIndex的值大于启动监听时etcd的当前index,则waitIndex参数无效,持久监听的行为同没有waitIndex参数一样。

 

 

   使用etcd4j进行事件监听

   在使用etcd4j进行事件监听时,有个注意事项:如果在同一个jvm虚拟机中既有修改查询etcd数据的操作,也有监听etcd事件,监听事件使用的EtcdClient实例和修改&查询操作使用的EtcdClient实例应分开,不能共用同一个实例,否则目前的etcd4j(2.7.0版本)会出现问题。多个key可以共用一个EtcdClient实例来进行监听,也可每个key使用一个单独的实例来进行监听。

  • 一次性监听

  //创建etcd客户端实例

  EtcdClient   etcdClient = new EtcdClient (new URI[]{new URI("127.0.0.1:2379")});

  /*setRetryPolicy设置重试策略

  * recursive()监听/application目录和其后辈目录中的事件,设置查询字符串recursive=true.

  *  waitForChange()监听,设置查询字符串wait=true.

 * -1表示永不超时,send()发送http请求进行监听

  */

  EtcdResponsePromise<EtcdKeysResponse> promise = etcd.get("/application").setRetryPolicy(

                        new RetryNTimes(300,Integer.MAX_VALUE)).recursive().waitForChange().

                        timeout(-1, TimeUnit.SECONDS).send();

    //增加listener

    promise.addListener(new IsSimplePromiseResponseHandler<EtcdKeysResponse>()

    {

         @Override

         public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {

                   //从promise中取响应,检查是否有异常

                   EtcdKeysResponse response = promise.getNow();

                   Throwable  exception = promise.getException();

                    ... 处理事件

                    //再次发起http请求进行监听

                    ...

         }   

     }

   );

 

     还可使用waitForChange(long waitIndex)来监听指定的事件。

 

  • 持久监听

     etcd4j的当前版本2.7.0不支持持久监听,可以使用

Java异步async httpclient(https://github.com/AsyncHttpClient/async-http-client)来实现持久监听,最挫最原始的持久监听实验代码如下:

     AsyncHttpClient asyncHttpClient = ...;

      //发送http get异步请求,不超时,且提供一个匿名的异步请求完成handler

      asyncHttpClient.prepareGet("http://127.0.0.1:2379/v2/keys/application?   wait=true&recursive=true&stream=true").setRequestTimeout(-1).execute(

       new AsyncHandler<String>() {

           //这个异常回调方法可类比为zookeeper的会话过期

           @Override

           public void onThrowable(Throwable t) {

                  // TODO Auto-generated method stub

                  //todo:异常,重新重试发起监听

            }

             

             //在会话期,有事件时会回调onBodyPartReceived方法

           @Override

           public State  onBodyPartReceived(HttpResponseBodyPart  bodyPart)

throws Exception {

                 // TODO Auto-generated method stub

                // bodyPart对应一个http chunk块,一个事件的数据可能由多个chunk块组成

                System.out.println("event="+new String(bodyPart.getBodyByteBuffer().array(),"utf-8"));

                //todo:将chunk中的数据加入缓冲区,待收到完整的事件数据后将json格式

                //的数据解析成java对象

               ...

               return State.CONTINUE;

         }

  

        @Override

        public  State onStatusReceived(HttpResponseStatus  responseStatus)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

         }

            

            //一个会话期只调用一次,这个回调方法可类比为zookeeper会话建立

         @Override

         public  State onHeadersReceived(HttpResponseHeaders headers)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

          }

 

         @Override

          public String onCompleted() throws Exception {

                    // TODO Auto-generated method stub

                   return "endWatch";

          }

   

    });

   

     

    

 

    

    

    

   

    

 

 

    

  • etcd事件监听
            
    
    博客分类: 开发  
  • 大小: 23.6 KB