Flink操作Hive数据库
Flink操作Hive数据库
DataStream<String> text = streamExecutionEnvironment.addSource(flinkKafkaConsume);
text .addSink(new SinkHIve());
public class SinkHive extends RichSinkFunction<String> implements SinkFunction<String> {
private static String driverName = "org.apache.hive.jdbc.HiveDriver"; //驱动名称
private static String url = "jdbc:hive2://10.10.82.137:10000/xxx";
private static String user ="";
private static String password ="";
private Connection connection;
private PreparedStatement statement;
// 1,初始化
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(driverName);
connection = DriverManager.getConnection(url, user, password);
}
// 2,执行
@Override
public void invoke(String value, Context context) throws Exception {
//####invoke######{"name":"gaojs0","id":"0010"}
System.out.println("##########invoke#############" + value);
Map<String,Object> map = JSONObject.parseObject(value);
String tableName = "test";
String sql = "INSERT INTO " + tableName + "( ";
String sql2 = " VALUES( ";
for (Map.Entry entry : map.entrySet()) {
sql += entry.getKey() + ", ";
sql2 += cover(entry.getValue()) + ", ";
}
System.out.println("######sql#######" + sql);
System.out.println("######sql2#######" + sql2);
String s1 = sql.substring(0, sql.length() - 2);
String s2 = sql2.substring(0, sql2.length() - 2);
String s3 = s1 + ") " + s2 + ")";
System.out.println("######s3#######" + s3);
if (value != null && value != "") {
statement = connection.prepareStatement(s3);
statement.execute();
}
}
// 3,关闭
@Override
public void close() throws Exception {
super.close();
if (statement != null)
statement.close();
if (connection != null)
connection.close();
}
public static String cover(Object value) {
if (value instanceof String || value instanceof Character) {
return "'" + value + "'";
} else {
return value + "";
}
}
此种方式操作的数据库,Hive会产生很多小文件,需要解决
上一篇: Method.getParameterAnnotations()研究
下一篇: 远程监控JVM性能