读取Hive中的数据写入Hbase
程序员文章站
2022-07-14 15:12:05
...
方式一:MR方式
1.首先将hive中的数据使用一定的分隔符生成对应的text文件,然后放到hdfs
public class LngLatOrder {
public static void main(String[] args) {
String table = args[0];
String date = PersonUtils.getDate();
String path = "/test/"+date+"/lnglat_order";
String warehouseLocation = "/user/hive/warehouse";
SparkSession spark = SparkSession
.builder()
.appName("LongitudeAndLatitude")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
String split = "'\004\001'";
//addr lng lat name mobile buyerid platform orderid
String sql = " select concat(name,"+split+",mobile,"+split+",buyerid,"+split
+",platform,"+split+",addr,"+split+",lng,"+split
+",lat ) from "+table+" ";
Dataset<Row> ds = spark.sql(sql);
ds.write().text(path);
spark.close();
}
}
2.读取text文件生成hfiles文件
public class test_demo{
static Logger logger = LoggerFactory.getLogger(EducationLabel.class);
public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String column = context.getConfiguration().get("column");
String familys = context.getConfiguration().get("columnFamily");
String rowkeyColumn = context.getConfiguration().get("rowkeyColumn");
String rowkeyColumnSplit = context.getConfiguration()
.get("rowkeyColumnSplit");
String splitSymbol = context.getConfiguration().get("splitSymbol");
if (BulkLoadXMl.isEmpty(rowkeyColumnSplit)) {
rowkeyColumnSplit = "|";
}
String[] f = familys.split("\\" + rowkeyColumnSplit);
String[] columF = column.split("\\" + rowkeyColumnSplit);
if (f.length != columF.length) {
System.err.println("定义的列和数据的个数不匹配");
}
String[] line = value.toString().split(splitSymbol);
boolean checkColumnNumber = BulkLoadXMl.checkColumnNumber(columF,
line, ",");
if (checkColumnNumber) {
HashMap<String, String> mapF = new HashMap<String, String>(16);
for (int i = 0; i < f.length; i++) {
for (int j = 0; j < columF.length; j++) {
mapF.put(f[i], columF[i]);
}
}
Object[] colums = BulkLoadXMl.columnNames(column.split("\\" +
rowkeyColumnSplit));
Map<String, String> map = new HashMap<String, String>(16);
for (int i = 0; i < colums.length; i++) {
String str = (String) colums[i];
map.put(str, line[i]);
}
Set<Map.Entry<String, String>> columFamilyAndCloums = mapF.entrySet();
for (Map.Entry<String, String> cfc : columFamilyAndCloums) {
String family = (String) cfc.getKey();
String[] cols = ((String) cfc.getValue()).split(",");
Map<String, String> rowOneColumns = new HashMap<String, String>(16);
for (int i = 0; i < cols.length; i++) {
rowOneColumns.put(cols[i],
(String) map.get(cols[i]));
}
String crc32 = OtherUtils.getEducateCrc32(map);
String hkey = OtherUtils.getRowKeyByReversalMobile(rowkeyColumn, map);
BulkLoadXMl.contextWrite(rowOneColumns, hkey+crc32, family,context);
}
} else {
System.out.println("列和数据的个数不匹配");
}
}
}
public static void bulkLoadApp(BulkLoads b) throws Exception {
Configuration conf = HBaseConfiguration.create();
String inputPath = b.getInPath();
String outputPath = b.getOutPath();
conf = setConf(conf,b);
org.apache.hadoop.hbase.client.Connection connection=ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(b.getTable()));
HbaseUtil.createTable(b.getTable(), b.getColumnFamily()
.split("\\"+b.getRowkeyColumnSplit()), Integer.parseInt(b.getNumRegions()));
try {
Job job = Job.getInstance(conf, "education label");
job.setJarByClass(EducationLabel.class);
job.setMapperClass(BulkLoadMap.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
//hTable = new HTable(conf, b.getTable());
hTable.setWriteBufferSize(6291456L);
hTable.setAutoFlushTo(false);
RegionLocator regionLocator=new HRegionLocator(TableName.valueOf(b.getTable()), (ClusterConnection) connection);
HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
if (job.waitForCompletion(true)) {
System.out.println(
"================BulkLoad开始执行================");
FsShell shell = new FsShell(conf);
try {
shell.run(new String[] {
"-chmod", "-R", "777", b.getOutPath()
});
} catch (Exception e) {
EducationLabel.logger.error("Couldnt change the file permissions ",
e);
throw new IOException(e);
}
} else {
EducationLabel.logger.error("loading failed.");
System.exit(1);
}
} catch (IllegalArgumentException e) {
e.printStackTrace();
} finally {
if (hTable != null) {
hTable.close();
}
}
}
public static void main(String[] args) throws Exception {
ArrayList<BulkLoads> bi = BulkLoadXMl.getBulkLoadInfo();
for (BulkLoads b : bi) {
bulkLoadApp(b);
}
}
}
配置文件的读取:
public class BulkLoadXMl {
public static ArrayList<BulkLoads> getBulkLoadInfo() {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder db = factory.newDocumentBuilder();
Document doc = db.parse("bulkloads.xml");
NodeList nodes = doc.getChildNodes();
Node root = nodes.item(0);
NodeList bulkloads = root.getChildNodes();
ArrayList<BulkLoads> list = new ArrayList<BulkLoads>();
BulkLoads b = new BulkLoads();
for(int i=0;i<bulkloads.getLength();i++){
Node bulkload = bulkloads.item(i);
NodeList texts = bulkload.getChildNodes();
for (int j = 0; j < texts.getLength(); j++) {
Node text = texts.item(j);
if (text.getNodeName().equals("splitSymbol")) {
b.setSplitSymbol(text.getTextContent());
} else if (text.getNodeName().equals("inPath")) {
b.setInPath(text.getTextContent());
} else if (text.getNodeName().equals("outPath")) {
b.setOutPath(text.getTextContent());
} else if (text.getNodeName().equals("table")) {
b.setTable(text.getTextContent());
} else if (text.getNodeName().equals("numRegions")) {
b.setNumRegions(text.getTextContent());
} else if (text.getNodeName().equals("columnFamily")) {
b.setColumnFamily(text.getTextContent());
} else if (text.getNodeName().equals("rowkeyColumn")) {
b.setRowkeyColumn(text.getTextContent());
}else if (text.getNodeName().equals("rowkeyColumnSplit")) {
b.setRowkeyColumnSplit(text.getTextContent());
}else if (text.getNodeName().equals("column")) {
b.setColumn(text.getTextContent());
}
}
}
list.add(b);
return list;
} catch (ParserConfigurationException e) {
e.printStackTrace();
} catch (SAXException se) {
se.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
return null;
}
/**
* 判断是否为null
*/
public static boolean isEmpty(String s){
if(s == null || s.equals("")) return true ;
else return false;
}
/**
* 获取所有的colum
*/
public static Object[] columnNames(String[] array){
ArrayList<String> list = new ArrayList<String>();
for(int i=0;i<array.length;i++){
String[] co = array[i].split(",");
for (String s : co) {
list.add(s);
}
}
return list.toArray();
}
/**
* 第一个参数要分割的字符串
* 第二个参数用什么分割
* 第三个参数是取出集合中的值
*/
public static String getRowKey(String rowkeyColumn,String rowkeyColumnSplit,Map<String,String> map){
String[] split = rowkeyColumn.split(",");
StringBuilder sb = new StringBuilder ();
for (String str : split) {
sb.append(map.get(str));
sb.append(rowkeyColumnSplit);
}
String string = sb.toString();
String hkey = string.substring(0, string.lastIndexOf(rowkeyColumnSplit));
return hkey;
}
/**
* 第一个参数是map
* 第二个参数是 rowkey
* 第三个是列族
* 第四个参数context
*/
@SuppressWarnings("unchecked")
public static void contextWrite(Map<String,String> map,String hkey,String family,org.apache.hadoop.mapreduce.Mapper.Context context){
try {
Set<Entry<String,String>> entrySet = map.entrySet();
for (Entry<String, String> entry : entrySet) {
String col = entry.getKey();
String hvalue = entry.getValue();
final byte[] rowKey = Bytes.toBytes(hkey);
final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
Put HPut = new Put(rowKey);
//其实不推荐关闭WAL,不过关了的确可以提升性能...因为HBase在写数据前会先把操作持久化在WAL中,以保证在异常情况下,HBase可以按照WAL的记录来恢复还未持久化的数据。
//HPut.setWriteToWAL(false);
byte[] cell = Bytes.toBytes(hvalue);
HPut.add(Bytes.toBytes(family), Bytes.toBytes(col), cell);
context.write(HKey, HPut);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static boolean isEq(String name1,String name2){
if(name1.equals(name2)){
return false;
}else{
return true;
}
}
public static String split(String[] linesArray){
StringBuffer sb = new StringBuffer();
for (String str : linesArray) {
sb.append(str);
sb.append("\t");
}
return sb.toString()+"\r\n";
}
/**
* 添加
* @throws InterruptedException
* @throws IOException
*/
private static String name = null;
private static StringBuffer buffer = new StringBuffer();
private static StringBuffer outBuffer = new StringBuffer();
private static boolean flag = true;
public static HashMap<String, String> saveSameMobileAndDistrinct(Text value,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
HashMap<String, String> map = new HashMap<String, String>();
String[] linesArray = value.toString().split("\t");
if(name != null){
if(!isEq(name,linesArray[0])){
outBufferAppend(value);
}else{
flag = false;
judgeMoblie(outBuffer.toString(),context);
clear(); flag= true;
append(linesArray[0],value);
}
}else{
//首次添加
append(linesArray[0],value);
}
map.put("flag", flag+"");
map.put("value", outBuffer.toString());
return map;
}
public static void append(String name,Text value){
bufferAppend(name);
outBufferAppend(value);
}
public static void bufferAppend(String ba){
buffer.append(ba);
name=ba;
}
public static void clear(){
buffer.delete(0, buffer.length());
outBuffer.delete(0, outBuffer.length());
}
public static void outBufferAppend(Text value){
outBuffer.append(value);
outBuffer.append("\r\n");
}
/**
* 添加单个联系人
* @throws InterruptedException
* @throws IOException
*/
public static void savePerson(String person,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
String column = context.getConfiguration().get("column");
String familys = context.getConfiguration().get("columnFamily");
String tableName = context.getConfiguration().get("table");
String[] values = person.split("\t");
String[] colums = column.split(",");
String rz = HbaseUtil.repairZero("0");
String hkey = colums[0]+"_"+rz;
HashMap<String, String> map = new HashMap<String,String>();
for(int i=0;i<colums.length;i++){
if(colums.length != values.length){
continue;
}
map.put(colums[i]+"_"+rz, values[i]);
}
Set<Entry<String,String>> es = map.entrySet();
for (Entry<String, String> entry : es) {
String col = entry.getKey();
String hvalue = entry.getValue();
final byte[] rowKey = Bytes.toBytes(map.get(hkey));
final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
Put HPut = new Put(rowKey);
byte[] cell = Bytes.toBytes(hvalue);
HPut.add(Bytes.toBytes(familys), Bytes.toBytes(col), cell);
context.write(HKey, HPut);
}
HbaseUtil.insert(tableName, map.get(colums[0]+"_"+rz), familys, "total", 0+1+"");
}
/**
* 添加多个联系人
* @throws InterruptedException
* @throws IOException
*/
public static void saveAssociatePersons(String persons,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
String column = context.getConfiguration().get("column");
String familys = context.getConfiguration().get("columnFamily");
String tableName = context.getConfiguration().get("table");
String[] person = persons.split("\r\n");
int j;
for(j=0;j<person.length;j++){
String rz = HbaseUtil.repairZero(j+"");
String[] values = person[j].split("\t");
String[] colums = column.split(",");
String hkey = values[0];
HashMap<String, String> map = new HashMap<String,String>();
for(int i=0;i<colums.length;i++){
if(colums.length != values.length){
continue;
}
map.put(colums[i]+"_"+rz, values[i]);
}
putHbase(map,hkey,familys,context);
HbaseUtil.insert(tableName, map.get(colums[0]+"_"+rz), familys, "total", j+1+"");
}
}
/**
*把数据添加hbase当中
*/
public static void putHbase(Map<String, String> map,String hkey ,String familys,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
Set<Entry<String,String>> es = map.entrySet();
for (Entry<String, String> entry : es) {
String col = entry.getKey();
String hvalue = entry.getValue();
final byte[] rowKey = Bytes.toBytes(hkey);
final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
Put HPut = new Put(rowKey);
byte[] cell = Bytes.toBytes(hvalue);
HPut.add(Bytes.toBytes(familys), Bytes.toBytes(col), cell);
context.write(HKey, HPut);
}
}
/**
* 判断手机号
* @throws InterruptedException
* @throws IOException
*/
public static void judgeMoblie(String mobile,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
try {
int length = mobile.toString().split("\r\n").length;
if(length > 1){
//有多个联系人
saveAssociatePersons(mobile.toString(),context);
}else{
//就只有一个联系人
savePerson(mobile.toString(),context);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//加载最后一个手机号
public static void lastDate(String lastData,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException{
if(lastData != null && lastData != ""){
judgeMoblie(lastData,context);
}
}
//字符串翻轉
public static String reverse(String str){
return new StringBuilder(str).reverse().toString();
}
public static boolean checkColumnNumber(String[] XmlColumnFamilys, String[] line, String splitSymbol){
int xmlColumNumber = 0;
for (String XmlColumnFamily : XmlColumnFamilys){
String[] xmlColums = XmlColumnFamily.split(splitSymbol);
xmlColumNumber += xmlColums.length;
}
if (xmlColumNumber == line.length){
return true;
}else {
// System.out.println("xmlColumNumber :" +xmlColumNumber+" ; line.length:"+line.length+" ");
return false;
}
}
//rowkey=moblie+crc
public static String rowkey(String hkey,Map<String,String> map){
return reverse(hkey)+"|" + Person2HbaseUtil.getCrc32(map);
}
}
方式二:程序从hive 或者其他的OLTP中传入Hbase有很多种方式,要是小量数据的话,可以直接读取hive中的数据put到hbase中就可以了。
package com.hdshu.spark.npm.hbase.gongan;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.hdshu.spark.utils.HBaseJobUtils;
import com.hdshu.spark.utils.SparkJobUtils;
import com.hdshu.spark.utils.property.PropertyUtil;
import scala.Tuple2;
public class Test_Demo_Spark {
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession
.builder()
.appName("Test_Demo_Spark ")
.config("spark.sql.warehouse.dir", SparkJobUtils.warehouseLocation)
.config("spark.sql.shuffle.partitions", SparkJobUtils.npmod_spark_sql_shuffle_partitions)
.config("spark.storage.memoryFraction","0.3")
.config("spark.shuffle.memoryFraction","0.6")
.config("spark.default.parallelism","200")
.enableHiveSupport()
.getOrCreate();
String npmod_sql1 = " select reverse(cast(mobile as string)) as rowkey,cast(v as string) as orders from test.test " ;
Dataset<Row> npmod_DF1 = spark.sql(npmod_sql1);
npmod_DF1.show();
JavaRDD<Row> javaRDD = npmod_DF1.javaRDD();
JavaPairRDD<ImmutableBytesWritable, Put> mapToPair2 = javaRDD.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() {
private static final long serialVersionUID = -6175819829004359177L;
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Row row)
throws Exception {
String rowKey = row.getAs("rowkey");
String orders = row.getAs("orders");
Put put = new Put(Bytes.toBytes(rowKey));
// 组织 列族 order 信息
put.addColumn(Bytes.toBytes("orders"), Bytes.toBytes("orders"), Bytes.toBytes(orders));
//(new ImmutableBytesWritable, put)
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
String hbase_zookeeper_quorum = HBaseJobUtils.hbase_BD_zookeeper_quorum;
String output_table =PropertyUtil.getString("inc.hbase.PDS.order","inc.properties");
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", hbase_zookeeper_quorum );
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set(TableOutputFormat.OUTPUT_TABLE, "half_year_orders");
Job job = Job.getInstance(conf);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
job.setOutputFormatClass(TableOutputFormat.class);
mapToPair2.saveAsNewAPIHadoopDataset(job.getConfiguration());
spark.close();
}
}
最后可以写查询接口了
连接池的初始化
private static Connection hTablePool;
private static final String ZOOKEEPER = "hbase.zookeeper.quorum";
public synchronized static Connection gethTablePool() {
if (hTablePool == null) {
conf = new Configuration();
conf.set(ZOOKEEPER, host);
conf.set("hbase.client.retries.number", "3");
conf.set("hbase.rpc.timeout", "600000");
conf.set("hbase.client.operation.timeout", "1200000");
conf.set("hbase.client.scanner.timeout.period", "600000");
conf.set("hbase.zookeeper.property.clientPort", port);
try {
hTablePool = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
return hTablePool;
}
public String queryTestDemo(String tableName,String mobile) throws Exception{
mobile=StringUtils.reverse(mobile);
String orders = getMobile(HbaseApi.getOneRow(tableName, mobile));
return orders;
}
public static Result getOneRow(String tableName, String rowKey) throws IOException {
Result rsResult = null;
Table table = null;
try {
table = hTablePool.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes()) ;
rsResult = table.get(get) ;
} catch (Exception e) {
e.printStackTrace() ;
}
finally {
close(table);
}
return rsResult;
}
public static String getMobile(Result r){
Cell[] rc = r.rawCells();
for (int i=0;i<rc.length;i++) {
Cell cell = rc[i];
if(i==0) return new String(CellUtil.cloneValue(cell));
}
return null;
}
使用spark方式写入
在使用Spark时经常需要把数据落入HBase中,如果使用普通的Java API,写入会速度很慢。还好Spark提供了Bulk写入方式的接口。那么Bulk写入与普通写入相比有什么优势呢?
- BulkLoad不会写WAL,也不会产生flush以及split。
- 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用Bulk就不会有这个顾虑。
- 过程中没有大量的接口调用消耗性能
import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.conf.Configuration /** * Created by shaonian */ object HBaseBulk { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Bulk") val sc = new SparkContext(sparkConf) val conf = new Configuration() conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set(TableOutputFormat.OUTPUT_TABLE, "bulktest") val job = Job.getInstance(conf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val init = sc.makeRDD(Array("1,james,32", "2,lebron,30", "3,harden,28")) val rdd = init.map(_.split(",")).map(arr => { val put = new Put(Bytes.toBytes(arr(0))) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt)) (new ImmutableBytesWritable, put) }) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() }
推荐阅读
-
Hive中导入Amazon S3中的分区表数据的操作
-
php从memcache读取数据再批量写入mysql的方法
-
利用python对Excel中的特定数据提取并写入新表的方法
-
php操作XML、读取数据和写入数据的实现代码
-
python读取excel指定列数据并写入到新的excel方法
-
python3 读取Excel表格中的数据
-
HIVE中的数据怎么导出到hdfs或本地呢
-
C#_Excel数据读取与写入_自定义解析封装类_支持设置标题行位置&使用excel表达式收集数据&单元格映射&标题映射&模板文件的参数数据替换(第二版-增加深度读取和更新功能)
-
VS中C#读取app.config数据库配置字符串的三种方法
-
Python基于csv模块实现读取与写入csv数据的方法