Spark使用Java读Hive写入HBase
程序员文章站
2022-07-14 13:18:51
...
Spark使用Java读Hive写入HBase
Spark读取Hive数据得方法在下面之前得一篇博客中有提及,在此就不重复提了,如果感兴趣得可以查阅如下连接中得博客
Spark使用Java读Hive得博客: https://blog.csdn.net/Aaron_ch/article/details/113151157
rowDataset.foreachPartition((ForeachPartitionFunction<Row>) t -> {
String columnFamily="test";
int i = 0;
List<Put> putList = new ArrayList<>();
Put put = null; //rowkey
Row row;
while (t.hasNext()) {
row = t.next();
if (put == null) {
put = new Put(row.getString(0).getBytes());
}
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("colName1"), Bytes.toBytes(row.getString(0)));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("colName2"), Bytes.toBytes(row.getString(1)));
if (row.getString(2) == null) {
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("colName3"), null);
} else {
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("colName3"), Bytes.toBytes(row.getString(2)));
}
putList.add(put);
hBaseWriter(putList, zk);
put = null;
putList.clear();
}
});
如下是写入得方法:
private static void hBaseWriter(List<Put> putList, String zk) {
for (int y = 0; y < putList.size(); y++) {
System.out.println(putList.get(y).toString());
}
System.out.println("start to write into hbase~");
String tableName = "xxx:tableName";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zk);
conf.set("hbase.zookeeper.property.clientPort", "2181");
Connection connection = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf(tableName));
if (putList.size() > 0) {
table.put(putList);
System.out.println("writer into hbase success!!!");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}