欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

HBase Java编程示例

程序员文章站 2022-04-21 17:04:52
...

HBase Java编程示例

HelloWorld.zip
  1. package elementary;

  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.TimeUnit;

  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.Cell;
  13. import org.apache.hadoop.hbase.HBaseConfiguration;
  14. import org.apache.hadoop.hbase.HColumnDescriptor;
  15. import org.apache.hadoop.hbase.HTableDescriptor;
  16. import org.apache.hadoop.hbase.MasterNotRunningException;
  17. import org.apache.hadoop.hbase.TableName;
  18. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  19. import org.apache.hadoop.hbase.client.Delete;
  20. import org.apache.hadoop.hbase.client.Get;
  21. import org.apache.hadoop.hbase.client.Admin;
  22. import org.apache.hadoop.hbase.client.BufferedMutator;
  23. import org.apache.hadoop.hbase.client.BufferedMutatorParams;
  24. import org.apache.hadoop.hbase.client.Connection;
  25. import org.apache.hadoop.hbase.client.ConnectionFactory;
  26. import org.apache.hadoop.hbase.client.Table;
  27. import org.apache.hadoop.hbase.client.Put;
  28. import org.apache.hadoop.hbase.client.Result;
  29. import org.apache.hadoop.hbase.client.ResultScanner;
  30. import org.apache.hadoop.hbase.client.Scan;
  31. import org.apache.hadoop.hbase.util.Bytes;
  32. import org.apache.hadoop.util.ThreadUtil;

  33. public class HelloWorld {
  34. private static Configuration conf = null;
  35. private static Connection conn = null;
  36. private static Admin admin = null;
  37. public static AtomicInteger count = new AtomicInteger();

  38. /**
  39. * 初始化配置
  40. */
  41. static {
  42. conf = HBaseConfiguration.create();
  43. //如果沒有配置文件,一定要記得手動宣告

  44. conf.set("hbase.zookeeper.quorum", "10.148.137.143");
  45. conf.set("hbase.zookeeper.property.clientPort", "2181");
  46. }

  47. static {
  48. try {
  49. conn = ConnectionFactory.createConnection();
  50. admin = conn.getAdmin();
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }

  55. static public class MyThread extends Thread
  56. {
  57. int _start;
  58. String _tablename;
  59. Connection conn;
  60. //BufferedMutator table;
  61. Table table;

  62. public MyThread(int start, String tablename) {
  63. _start = start;
  64. _tablename = tablename;
  65. }

  66. public void run() {
  67. String tablename = _tablename;
  68. Thread current = Thread.currentThread();
  69. long thread_id = current.getId();
  70. System.out.printf("thread[%d] run\n", thread_id);

  71. try {
  72. conn = ConnectionFactory.createConnection();
  73. //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
  74. //params.writeBufferSize(1024 * 4);
  75. //table = conn.getBufferedMutator(params);
  76. table = conn.getTable(TableName.valueOf(tablename));

  77. for (int j=_start; j
  78. for (int i=0; i
  79. // zkb_0_0
  80. String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
  81. Put put = new Put(Bytes.toBytes(zkb));
  82. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));
  83. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
  84. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
  85. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
  86. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
  87. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
  88. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
  89. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
  90. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
  91. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
  92. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
  93. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
  94. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
  95. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
  96. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
  97. //table.mutate(put);
  98. table.put(put);

  99. int m = HelloWorld.count.incrementAndGet();
  100. if (m % 10000 == 0) {
  101. Date dt = new Date();
  102. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
  103. String now = sdf.format(dt);
  104. System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
  105. }
  106. }
  107. }

  108. System.out.printf("thread[%d] over\n", thread_id);
  109. }
  110. catch (Exception e) {
  111. e.printStackTrace();
  112. }
  113. }
  114. }

  115. /**
  116. * 建立表格
  117. * @param tablename
  118. * @param cfs
  119. */
  120. public static void createTable(String tablename, String[] cfs){
  121. try {
  122. if (admin.tableExists(TableName.valueOf(tablename))) {
  123. System.out.println("table already exists!");
  124. } else {
  125. HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
  126. for (int i = 0; i
  127. HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
  128. desc.setMaxVersions(3650);
  129. tableDesc.addFamily(desc);
  130. }

  131. byte[][] splitKeys = new byte[][] {
  132. Bytes.toBytes("zkb_0_0"),
  133. Bytes.toBytes("zkb_10_0"),
  134. Bytes.toBytes("zkb_20_0"),
  135. Bytes.toBytes("zkb_30_0"),
  136. Bytes.toBytes("zkb_40_0"),
  137. Bytes.toBytes("zkb_50_0"),
  138. Bytes.toBytes("zkb_60_0"),
  139. Bytes.toBytes("zkb_70_0"),
  140. Bytes.toBytes("zkb_80_0"),
  141. Bytes.toBytes("zkb_90_0"),
  142. Bytes.toBytes("zkb_100_0")
  143. };
  144. admin.createTable(tableDesc, splitKeys);
  145. admin.close();
  146. System.out.println("create table " + tablename + " ok.");
  147. }
  148. } catch (MasterNotRunningException e) {
  149. e.printStackTrace();
  150. } catch (ZooKeeperConnectionException e) {
  151. e.printStackTrace();
  152. } catch (IOException e) {
  153. e.printStackTrace();
  154. }
  155. }

  156. /**
  157. * 刪除表格
  158. * @param tablename
  159. */
  160. public static void deleteTable(String tablename){
  161. try {
  162. //Connection conn = ConnectionFactory.createConnection();
  163. //Admin admin = conn.getAdmin();
  164. admin.disableTable(TableName.valueOf(tablename));
  165. admin.deleteTable(TableName.valueOf(tablename));
  166. System.out.println("delete table " + tablename + " ok.");
  167. } catch (IOException e) {
  168. e.printStackTrace();
  169. }
  170. }

  171. /**
  172. * 刪除一筆資料
  173. * @param tableName
  174. * @param rowKey
  175. */
  176. public static void delRecord (String tableName, String rowKey){
  177. try {
  178. Table table = conn.getTable(TableName.valueOf(tableName));

  179. List list = new ArrayList();
  180. Delete del = new Delete(rowKey.getBytes());
  181. list.add(del);
  182. table.delete(list);
  183. System.out.println("del recored " + rowKey + " ok.");
  184. } catch (IOException e) {
  185. e.printStackTrace();
  186. }
  187. }

  188. /**
  189. * 取得一筆資料
  190. * @param tableName
  191. * @param rowKey
  192. */
  193. public static void getOneRecord (String tableName, String rowKey){
  194. try {
  195. Table table = conn.getTable(TableName.valueOf(tableName));

  196. Get get = new Get(rowKey.getBytes());
  197. Result rs = table.get(get);
  198. List list = rs.listCells();
  199. for(Cell cell:list){
  200. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  201. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  202. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  203. System.out.print(cell.getTimestamp() + " " );
  204. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  205. System.out.println("");
  206. }
  207. } catch (IOException e) {
  208. e.printStackTrace();
  209. }
  210. }

  211. /**
  212. * 取得所有資料
  213. * @param tableName
  214. */
  215. public static void getAllRecord (String tableName) {
  216. try{
  217. //Connection conn = ConnectionFactory.createConnection();
  218. Table table = conn.getTable(TableName.valueOf(tableName));

  219. Scan scan = new Scan();
  220. ResultScanner resultscanner = table.getScanner(scan);
  221. for(Result rs:resultscanner){
  222. List list = rs.listCells();
  223. for(Cell cell:list){
  224. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  225. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  226. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  227. System.out.print(cell.getTimestamp() + " " );
  228. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  229. System.out.println("");
  230. }
  231. }
  232. } catch (IOException e){
  233. e.printStackTrace();
  234. }
  235. }

  236. /**
  237. * 取得Family清單
  238. * @param tableName
  239. * @return
  240. */
  241. public static ArrayList getAllFamilyName(String tableName) {
  242. ArrayList familyname_list = new ArrayList();
  243. try{
  244. //Connection conn = ConnectionFactory.createConnection();
  245. Table table = conn.getTable(TableName.valueOf(tableName));

  246. HTableDescriptor htabledescriptor = table.getTableDescriptor();
  247. HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
  248. for(int i=0;i
  249. HColumnDescriptor hd = hdlist[i];
  250. familyname_list.add(hd.getNameAsString());
  251. }
  252. } catch (IOException e){
  253. e.printStackTrace();
  254. }
  255. return familyname_list;
  256. }

  257. // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
  258. public static void main(String[] args) {
  259. System.out.println("HelloWorldX");
  260. if (args.length > 0)
  261. System.out.println(args[0]);

  262. int start = 0;
  263. if (args.length > 1)
  264. start = Integer.valueOf(args[1]);
  265. if (start
  266. start = 0;

  267. int num_threads = 16;
  268. if (args.length > 2)
  269. num_threads = Integer.valueOf(args[2]);

  270. try {
  271. String tablename = "scores";
  272. String[] familys = {"grade", "course"};
  273. HelloWorld.createTable(tablename, familys);

  274. //ExecutorService thread_pool = Executors.newSingleThreadExecutor();
  275. ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
  276. Thread[] pool = new HelloWorld.MyThread[80];
  277. for (int i=0; i
  278. pool[i] = new HelloWorld.MyThread(i, tablename);
  279. thread_pool.execute(pool[i]);
  280. }

  281. thread_pool.shutdown();
  282. System.out.println("over");
  283. }
  284. catch (Exception e) {
  285. e.printStackTrace();
  286. }
  287. }

  288. }
相关标签: android