欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink操作Hive数据库

程序员文章站 2022-03-24 20:11:45
...

Flink操作Hive数据库

DataStream<String> text = streamExecutionEnvironment.addSource(flinkKafkaConsume);

text .addSink(new SinkHIve());

 

public class SinkHive extends RichSinkFunction<String> implements SinkFunction<String> {

    private static String driverName = "org.apache.hive.jdbc.HiveDriver";  //驱动名称

    private static String url = "jdbc:hive2://10.10.82.137:10000/xxx";

    private static String user ="";

    private static String password ="";

 

    private Connection connection;

    private PreparedStatement statement;

 

    // 1,初始化

    @Override

    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        Class.forName(driverName);

        connection = DriverManager.getConnection(url, user, password);

    }

 

    // 2,执行

    @Override

    public void invoke(String value, Context context) throws Exception {

    //####invoke######{"name":"gaojs0","id":"0010"}

        System.out.println("##########invoke#############" + value);

        

        Map<String,Object> map = JSONObject.parseObject(value);

        String tableName = "test";

        String sql = "INSERT INTO " + tableName + "( ";

        String sql2 = " VALUES( ";

        for (Map.Entry entry : map.entrySet()) {

            sql += entry.getKey() + ", ";

            sql2 += cover(entry.getValue()) + ", ";

        }

        

        System.out.println("######sql#######" + sql);

        System.out.println("######sql2#######" + sql2);

        

        String s1 = sql.substring(0, sql.length() - 2);

        String s2 = sql2.substring(0, sql2.length() - 2);

        String s3 = s1 + ") " + s2 + ")";

        System.out.println("######s3#######" + s3);

 

        

        if (value != null && value != "") {

        statement = connection.prepareStatement(s3);

            statement.execute();

        }

    }

 

    // 3,关闭

    @Override

    public void close() throws Exception {

        super.close();

        if (statement != null)

            statement.close();

        if (connection != null)

            connection.close();

    }

    

    public static String cover(Object value) {

        if (value instanceof String || value instanceof Character) {

            return "'" + value + "'";

        } else {

            return value + "";

        }

    }

 

此种方式操作的数据库,Hive会产生很多小文件,需要解决