欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HBase: HTablePool重构及优化 博客分类: Hbase HTablePoolrewriteflushCommitsPutScan 

程序员文章站 2024-02-28 22:36:22
...

HBase Version: hbase-0.90.3-cdh3u1

org.apache.hadoop.hbase.client.HTablePool

用起来不是很方便. 所以重写了一些HTablePool, 对自己业务逻辑这块比较相关. 欢迎讨论.

主要是对源代码下面4点进行改进和设置:

1.  为不同的table建立的poolSize不一样, 目前HTablePool为所有的table建立的maxSize一致.

private final int maxSize;

2.  从HTablePool中getTable是只初始化了一个HTable, 而且在这个时候才初始化HTable的Queen.

    这个点不是很好, 也是我想修改源码的出发点.

  /**
   * Get a reference to the specified table from the pool.<p>
   *
   * Create a new one if one is not available.
   * @param tableName table name
   * @return a reference to the specified table
   * @throws RuntimeException if there is a problem instantiating the HTable
   */
  public HTableInterface getTable(String tableName) {
    LinkedList<HTableInterface> queue = tables.get(tableName);
    if(queue == null) {
      queue = new LinkedList<HTableInterface>();
      tables.putIfAbsent(tableName, queue);
      return createHTable(tableName);
    }
    HTableInterface table;
    synchronized(queue) {
      table = queue.poll();
    }
    if(table == null) {
      return createHTable(tableName);
    }
    return table;
  }

 3.   应该有一个createTablePool的方法, 便于用户自己创建HTablePool.此方法可以与closeTablePool相互呼应.创建后面再关闭.

  public void closeTablePool(final String tableName)  {
    Queue<HTableInterface> queue = tables.get(tableName);
    synchronized (queue) {
      HTableInterface table = queue.poll();
      while (table != null) {
        this.tableFactory.releaseHTableInterface(table);
        table = queue.poll();
      }
    }
    HConnectionManager.deleteConnection(this.config, true);
  }

 4.  由于HTable的Put可以使用优化, 让多个Put一起提交flushCommits(). 循环pool的Htable,调用flushCommits().

 

贴上修改后的源码:

package org.apache.hadoop.hbase.client;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
/**
 * A rewrite pool of HTable instances.<p>
 *
 * Each HTablePool acts as a pool for all tables.  To use, instantiate an
 * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
 * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.
 * 
 * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable
 * references that will ever be retained for each table.  Otherwise the default
 * is {@link Integer#MAX_VALUE}.
 *
 * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
 * 
 * @author greatwqs
 * @update 2012-08-25
 */
public class MyHTablePool {
	
	public final static int DEFAULT_POOL_SIZE   = 4;
	
	/**
	 * ConcurrentMap<String, LinkedList<HTableInterface>>
	 * String tableName
	 * LinkedList<HTableInterface> the HTable pool contains HTableInterface
	 * LinkedList you can create HTable pool different size you want.
	 */
    private final ConcurrentMap<String, LinkedList<HTableInterface>> tables 
                    = new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
    /***
     * Configuration for hbase-site.xml
     */
	private final Configuration config;
	/**
	 * HTableInterfaceFactory that createHTableInterface and releaseHTableInterface
	 */
	private final HTableInterfaceFactory tableFactory;

	/**
	 * Default Constructor. 
	 */
	public MyHTablePool() {
		this(HBaseConfiguration.create());
	}

	/**
	 * Constructor to set maximum versions and use the specified configuration.
	 * @param config configuration
	 */
	public MyHTablePool(final Configuration config) {
		this(config, null);
	}

	public MyHTablePool(final Configuration config, final HTableInterfaceFactory tableFactory) {
		// Make a new configuration instance so I can safely cleanup when
		// done with the pool.
		this.config = config == null ? new Configuration() : new Configuration(
				config);
		this.tableFactory = tableFactory == null ? new HTableFactory()
				: tableFactory;
	}
	
	/**
	 * Create all the HTable instances , belonging to the given table.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableName 
	 * @param maxSize
	 * @param isAutoFlush
	 */
	public void createHTablePool(final String tableName, final int maxSize, boolean isAutoFlush) {
		LinkedList<HTableInterface> queue = tables.get(tableName);
		if (queue == null) {
			queue = new LinkedList<HTableInterface>();
			tables.putIfAbsent(tableName, queue);
		}
		synchronized (queue) {
			int addHTableSize = maxSize - queue.size();
			if(addHTableSize <= 0){
				return;
			}
			for(int i=0; i<addHTableSize; i++){
				HTable table = (HTable)createHTable(tableName);
				if(table != null){
					table.setAutoFlush(isAutoFlush);
					queue.add(table);
				}
			}
		}
	}
	
	/**
	 * Create all the HTable instances , belonging to the given tables.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableNameArray
	 * @param maxSize
	 * @param isAutoFlush default false
	 * usage example:
	 * false: when {@link Put} use. use buffere put. call flushCommits after a time. 
	 *        you can design a thread(such as 3MS run a time)to loop all pool table, and call flushCommits.
	 *        the performance well.
	 * true: when {@link Scan} and {@link Delete} use. 
	 */
	public void createHTablePool(final String[] tableNameArray, final int maxSize, boolean isAutoFlush) {
		for(String tableName : tableNameArray){
			createHTablePool(tableName,maxSize,isAutoFlush);
		}
	}
	
	/**
	 * Create all the HTable instances , belonging to the given tables.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableName
	 * @param maxSize
	 */
	public void createHTablePool(final String[] tableNameArray, final int maxSize) {
		createHTablePool(tableNameArray,maxSize,false);
	}

	/**
	 * Get a reference to the specified table from the pool.<p>
	 *
	 * @param tableName table name
	 * @return a reference to the specified table
	 * @throws RuntimeException if there is a problem instantiating the HTable
	 */
	public HTableInterface getHTable(String tableName) {
		LinkedList<HTableInterface> queue = tables.get(tableName);
		if (queue == null) {
			throw new RuntimeException("There is no pool for the HTable");
		}
		HTableInterface table;
		synchronized (queue) {
			table = queue.poll();
		}
		return table;
	}

	/**
	 * Get a reference to the specified table from the pool.<p>
	 *
	 * Create a new one if one is not available.
	 * @param tableName table name
	 * @return a reference to the specified table
	 * @throws RuntimeException if there is a problem instantiating the HTable
	 */
	public HTableInterface getHTable(byte[] tableName) {
		return getHTable(Bytes.toString(tableName));
	}
	
	/**
	* Puts the specified HTable back into the pool.
    * <p>
    * 
    * If the HTable not belong to HTablePool before, do not use this method.
    * 
    * @param table table
    */
	public void putHTableBack(HTableInterface table) {
		LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
		synchronized (queue) {
			queue.add(table);
		}
	}

	protected HTableInterface createHTable(String tableName) {
		return this.tableFactory.createHTableInterface(config, Bytes
				.toBytes(tableName));
	}

	/**
	 * Closes all the HTable instances , belonging to the given table, in the table pool.
	 * <p>
	 * Note: this is a 'shutdown' of the given table pool and different from
	 * {@link #putTable(HTableInterface)}, that is used to return the table
	 * instance to the pool for future re-use.
	 *
	 * @param tableName
	 */
	public void closeHTablePool(final String tableName) {
		Queue<HTableInterface> queue = tables.get(tableName);
		synchronized (queue) {
			HTableInterface table = queue.poll();
			while (table != null) {
				this.tableFactory.releaseHTableInterface(table);
				table = queue.poll();
			}
		}
		HConnectionManager.deleteConnection(this.config, true);
	}

	/**
	 * See {@link #closeTablePool(String)}.
	 *
	 * @param tableName
	 */
	public void closeHTablePool(final byte[] tableName) {
		closeHTablePool(Bytes.toString(tableName));
	}
	
	/**
	 * See {@link #closeTablePool(String)}.
	 *
	 * @param tableName
	 */
	public void closeHTablePool() {
		for(String tabName:tables.keySet()){
			closeHTablePool(tabName);
		}
	}

	/**
	 * getCurrentPoolSize
	 * @param tableName
	 * @return
	 */
	public int getCurrentPoolSize(String tableName) {
		Queue<HTableInterface> queue = tables.get(tableName);
		synchronized (queue) {
			return queue.size();
		}
	}
	
} 

 

org.apache.hadoop.hbase.client.MyHTablePoolTest 测试实例

package org.apache.hadoop.hbase.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;

/**
 * @author greatwqs
 * @update 2012-08-25
 */
public class MyHTablePoolTest {
	
	/**
	 * test method
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		// 1. my config file 
		String configFile = "conf/hbase-site.xml";
		Configuration config = new Configuration();
		config.addResource(new Path(configFile));
		// 2. init HTablePool
		MyHTablePool myPool = new MyHTablePool(config);
		// 3. create HTablePool for a table
		myPool.createHTablePool("DCP_DataCenter_Base", MyHTablePool.DEFAULT_POOL_SIZE, false);
		// 4. get already exist HTable from HTablePool
		HTable table = (HTable) myPool.getHTable("DCP_DataCenter_Base");
		if(table != null){
			System.out.println("get HTable from HTablePool Success!");
		}
		// 5. get all data from HTable, and print to console.
		Scan scan = new Scan();
		ResultScanner rs = table.getScanner(scan);
		try {
			for (Result result : rs) {
				KeyValue[] kv = result.raw();
				byte[] key = kv[0].getRow();
				System.out.println("RowKey: " + new String(key));
				for (int i = 0; i < kv.length; i++) {
					System.out.println("ColumnFamily: "	+ new String(kv[i].getFamily()));
					System.out.println("Qualifier: "+ new String(kv[i].getQualifier()));
					System.out.println("Timestamp: "+ String.valueOf(kv[i].getTimestamp()));
					System.out.println("Value: " + new String(kv[i].getValue()));
				}
				System.out.println();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			rs.close();
		}
		// 6. after use HTable end, then put the HTable back to HTablePool.
		myPool.putHTableBack(table);
		// 7. close HTablePool
		myPool.closeHTablePool();
	}
}