欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

避免HBase PageFilter踩坑,这几点你必须要清楚

程序员文章站 2022-04-09 15:33:55
有这样一个场景,在HBase中需要分页查询,同时根据某一列的值进行过滤。 不同于RDBMS天然支持分页查询,HBase要进行分页必须由自己实现。据我了解的,目前有两种方案, 一是《HBase权威指南》中提到的用PageFilter加循环动态设置startRow实现,详细见这里。但这种方法效率比较低, ......

有这样一个场景,在hbase中需要分页查询,同时根据某一列的值进行过滤。

不同于rdbms天然支持分页查询,hbase要进行分页必须由自己实现。据我了解的,目前有两种方案, 一是《hbase权威指南》中提到的用pagefilter加循环动态设置startrow实现,详细见这里。但这种方法效率比较低,且有冗余查询。因此京东研发了一种用额外的一张表来保存行序号的方案。 该种方案效率较高,但实现麻烦些,需要维护一张额外的表。

不管是方案也好,人也好,没有最好的,只有最适合的。
在我司的使用场景中,对于性能的要求并不高,所以采取了第一种方案。本来使用的美滋滋,但有一天需要在分页查询的同时根据某一列的值进行过滤。根据列值过滤,自然是用singlecolumnvaluefilter(下文简称scvfilter)。代码大致如下,只列出了本文主题相关的逻辑,

scan scan = initscan(xxx);
filterlist filterlist=new filterlist();
scan.setfilter(filterlist);
filterlist.addfilter(new pagefilter(1));
filterlist.addfilter(new singlecolumnvaluefilter(family,isdeleted, comparefilter.compareop.equal, bytes.tobytes(false)));

 

数据如下

row1                 column=f:content, timestamp=1513953705613, value=content1
 row1                 column=f:isdel, timestamp=1513953705613, value=1
 row1                 column=f:name, timestamp=1513953725029, value=name1
 row2                 column=f:content, timestamp=1513953705613, value=content2
 row2                 column=f:isdel, timestamp=1513953744613, value=0
 row2                 column=f:name, timestamp=1513953730348, value=name2
 row3                 column=f:content, timestamp=1513953705613, value=content3
 row3                 column=f:isdel, timestamp=1513953751332, value=0
 row3                 column=f:name, timestamp=1513953734698, value=name3

 

在上面的代码中。向scan添加了两个filter:首先添加了pagefilter,限制这次查询数量为1,然后添加了一个scvfilter,限制了只返回isdeleted=false的行。

上面的代码,看上去无懈可击,但在运行时却没有查询到数据!

刚好最近在看hbase的代码,就在本地debug了下hbase服务端filter相关的查询流程。

filter流程

首先看下hbase filter的流程,见图:

避免HBase PageFilter踩坑,这几点你必须要清楚

 

然后再看pagefilter的实现逻辑。

public class pagefilter extends filterbase {
  private long pagesize = long.max_value;
  private int rowsaccepted = 0;

  /**
   * constructor that takes a maximum page size.
   *
   * @param pagesize maximum result size.
   */
  public pagefilter(final long pagesize) {
    preconditions.checkargument(pagesize >= 0, "must be positive %s", pagesize);
    this.pagesize = pagesize;
  }

  public long getpagesize() {
    return pagesize;
  }

  @override
  public returncode filterkeyvalue(cell ignored) throws ioexception {
    return returncode.include;
  }
 
  public boolean filterallremaining() {
    return this.rowsaccepted >= this.pagesize;
  }

  public boolean filterrow() {
    this.rowsaccepted++;
    return this.rowsaccepted > this.pagesize;
  }
  
}

 

其实很简单,内部有一个计数器,每次调用filterrow的时候,计数器都会+1,如果计数器值大于pagesize,filterrow就会返回true,那之后的行就会被过滤掉。

再看scvfilter的实现逻辑。

public class singlecolumnvaluefilter extends filterbase {
  private static final log log = logfactory.getlog(singlecolumnvaluefilter.class);

  protected byte [] columnfamily;
  protected byte [] columnqualifier;
  protected compareop compareop;
  protected bytearraycomparable comparator;
  protected boolean foundcolumn = false;
  protected boolean matchedcolumn = false;
  protected boolean filterifmissing = false;
  protected boolean latestversiononly = true;

 

  /**
   * constructor for binary compare of the value of a single column.  if the
   * column is found and the condition passes, all columns of the row will be
   * emitted.  if the condition fails, the row will not be emitted.
   * <p>
   * use the filterifcolumnmissing flag to set whether the rest of the columns
   * in a row will be emitted if the specified column to check is not found in
   * the row.
   *
   * @param family name of column family
   * @param qualifier name of column qualifier
   * @param compareop operator
   * @param comparator comparator to use.
   */
  public singlecolumnvaluefilter(final byte [] family, final byte [] qualifier,
      final compareop compareop, final bytearraycomparable comparator) {
    this.columnfamily = family;
    this.columnqualifier = qualifier;
    this.compareop = compareop;
    this.comparator = comparator;
  }

 
   
  @override
  public returncode filterkeyvalue(cell c) {
    if (this.matchedcolumn) {
      // we already found and matched the single column, all keys now pass
      return returncode.include;
    } else if (this.latestversiononly && this.foundcolumn) {
      // we found but did not match the single column, skip to next row
      return returncode.next_row;
    }
    if (!cellutil.matchingcolumn(c, this.columnfamily, this.columnqualifier)) {
      return returncode.include;
    }
    foundcolumn = true;
    if (filtercolumnvalue(c.getvaluearray(), c.getvalueoffset(), c.getvaluelength())) {
      return this.latestversiononly? returncode.next_row: returncode.include;
    }
    this.matchedcolumn = true;
    return returncode.include;
  }

 
  
  private boolean filtercolumnvalue(final byte [] data, final int offset,
      final int length) {
    int compareresult = this.comparator.compareto(data, offset, length);
    switch (this.compareop) {
    case less:
      return compareresult <= 0;
    case less_or_equal:
      return compareresult < 0;
    case equal:
      return compareresult != 0;
    case not_equal:
      return compareresult == 0;
    case greater_or_equal:
      return compareresult > 0;
    case greater:
      return compareresult >= 0;
    default:
      throw new runtimeexception("unknown compare op " + compareop.name());
    }
  }

  public boolean filterrow() {
    // if column was found, return false if it was matched, true if it was not
    // if column not found, return true if we filter if missing, false if not
    return this.foundcolumn? !this.matchedcolumn: this.filterifmissing;
  }
   
 
}

 

在hbase中,对于每一行的每一列都会调用到filterkeyvalue,scvfilter的该方法处理逻辑如下:

1. 如果已经匹配过对应的列并且对应列的值符合要求,则直接返回inclue,表示这一行的这一列要被加入到结果集
2. 否则如latestversiononly为true(latestversiononly代表是否只查询最新的数据,一般为true),并且已经匹配过对应的列(但是对应的列的值不满足要求),则返回exclude,代表丢弃该行
3. 如果当前列不是要匹配的列。则返回include,否则将matchedcolumn置为true,代表以及找到了目标列
4. 如果当前列的值不满足要求,在latestversiononly为true时,返回next_row,代表忽略当前行还剩下的列,直接跳到下一行
5. 如果当前列的值满足要求,将matchedcolumn置为true,代表已经找到了对应的列,并且对应的列值满足要求。这样,该行下一列再进入这个方法时,到第1步就会直接返回,提高匹配效率

再看filterrow方法,该方法调用时机在filterkeyvalue之后,对每一行只会调用一次。
scvfilter中该方法逻辑很简单:

1. 如果找到了对应的列,如其值满足要求,则返回false,代表将该行加入到结果集,如其值不满足要求,则返回true,代表过滤该行
2. 如果没找到对应的列,返回filterifmissing的值。

猜想:

是不是因为将pagefilter添加到scvfilter的前面,当判断第一行的时候,调用pagefilter的filterrow,导致pagefilter的计数器+1,但是进行到scvfilter的filterrow的时候,该行又被过滤掉了,在检验下一行时,因为pagefilter计数器已经达到了我们设定的pagesize,所以接下来的行都会被过滤掉,返回结果没有数据。

验证:

在filterlist中,先加入scvfilter,再加入pagefilter

scan scan = initscan(xxx);
filterlist filterlist=new filterlist();
scan.setfilter(filterlist);
filterlist.addfilter(new singlecolumnvaluefilter(family,isdeleted, comparefilter.compareop.equal,     bytes.tobytes(false)));
filterlist.addfilter(new pagefilter(1));

 

结果是我们期望的第2行的值。

结论

当要将pagefilter和其他filter使用时,最好将pagefilter加入到filterlist的末尾,否则可能会出现结果个数小于你期望的数量。
(其实正常情况pagefilter返回的结果数量可能大于设定的值,因为服务器集群的pagefilter是隔离的。)

彩蛋

其实,在排查问题的过程中,并没有这样顺利,因为问题出在线上,所以我在本地查问题时自己造了一些测试数据,令人惊讶的是,就算我先加入scvfilter,再加入pagefilter,返回的结果也是符合预期的。
测试数据如下:

row1                 column=f:isdel, timestamp=1513953705613, value=1
 row1                 column=f:name, timestamp=1513953725029, value=name1
 row2                 column=f:isdel, timestamp=1513953744613, value=0
 row2                 column=f:name, timestamp=1513953730348, value=name2
 row3                 column=f:isdel, timestamp=1513953751332, value=0
 row3                 column=f:name, timestamp=1513953734698, value=name3

 

当时在本地一直不能复现问题。很是苦恼,最后竟然发现使用scvfilter查询的结果还和数据的列的顺序有关。

在服务端,hbase会对客户端传递过来的filter封装成filterwrapper。

class regionscannerimpl implements regionscanner {

    regionscannerimpl(scan scan, list<keyvaluescanner> additionalscanners, hregion region)
        throws ioexception {
      this.region = region;
      this.maxresultsize = scan.getmaxresultsize();
      if (scan.hasfilter()) {
        this.filter = new filterwrapper(scan.getfilter());
      } else {
        this.filter = null;
      }
    }
   ....
}

 

在查询数据时,在hregion的nextinternal方法中,会调用filterwrapper的filterrowcellswithret方法

filterwrapper相关代码如下:

/**
 * this is a filter wrapper class which is used in the server side. some filter
 * related hooks can be defined in this wrapper. the only way to create a
 * filterwrapper instance is passing a client side filter instance through
 * {@link org.apache.hadoop.hbase.client.scan#getfilter()}.
 * 
 */
 
final public class filterwrapper extends filter {
  filter filter = null;

  public filterwrapper( filter filter ) {
    if (null == filter) {
      // ensure the filter instance is not null
      throw new nullpointerexception("cannot create filterwrapper with null filter");
    }
    this.filter = filter;
  }

 
  public enum filterrowretcode {
    not_called,
    include,     // corresponds to filter.filterrow() returning false
    exclude      // corresponds to filter.filterrow() returning true
  }
  
  public filterrowretcode filterrowcellswithret(list<cell> kvs) throws ioexception {
    this.filter.filterrowcells(kvs);
    if (!kvs.isempty()) {
      if (this.filter.filterrow()) {
        kvs.clear();
        return filterrowretcode.exclude;
      }
      return filterrowretcode.include;
    }
    return filterrowretcode.not_called;
  }

 
}

 

这里的kvs就是一行数据经过filterkeyvalue后没被过滤的列。

可以看到当kvs不为empty时,filterrowcellswithret方法中会调用指定filter的filterrow方法,上面已经说过了,pagefilter的计数器就是在其filterrow方法中增加的。

而当kvs为empty时,pagefilter的计数器就不会增加了。再看我们的测试数据,因为行的第一列就是scvfilter的目标列isdeleted。回顾上面scvfilter的讲解我们知道,当一行的目标列的值不满足要求时,该行剩下的列都会直接被过滤掉!

对于测试数据第一行,走到filterrowcellswithret时kvs是empty的。导致pagefilter的计数器没有+1。还会继续遍历剩下的行。从而使得返回的结果看上去是正常的。

而出问题的数据,因为在列isdeleted之前还有列content,所以当一行的isdeleted不满足要求时,kvs也不会为empty。因为列content的值已经加入到kvs中了(这些数据要调用到scvfilter的filterrow的时间会被过滤掉)。

感想

从实现上来看hbase的filter的实现还是比较粗糙的。效率也比较感人,不考虑网络传输和客户端内存的消耗,基本上和你在客户端过滤差不多。

 

本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q