欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark+phoenix

程序员文章站 2022-03-08 22:29:58
phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbas ......

 

phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:

    • to ensure that all requisite phoenix / hbase platform dependencies are available on the classpath for the spark executors and drivers, set both ‘spark.executor.extraclasspath’ and ‘spark.driver.extraclasspath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’
    • note that for phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. as of phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against spark 2.x. if compability with spark 1.x if needed, you must compile phoenix with the spark16 maven profile.

所以只能考虑用jdbc的方式做。

我使用的版本信息:

  • spark:2.2.1
  • phoenix:4.13.2

jar包引入:

  •  <dependency>
                <groupid>org.apache.phoenix</groupid>
                <artifactid>phoenix-core</artifactid>
                <version>4.13.1-hbase-1.2</version>
            </dependency>
            <dependency>
                <groupid>org.apache.phoenix</groupid>
                <artifactid>phoenix-spark</artifactid>
                <version>4.13.1-hbase-1.2</version>
            </dependency>

     

phoenixutil类:

  • public class phoenixutil {
    
        private static linkedlist<connection> connectionqueue;
    
        static {
            try {
                class.forname("org.apache.phoenix.jdbc.phoenixdriver");
            } catch (classnotfoundexception e) {
                e.printstacktrace();
            }
        }
    
        public synchronized static connection getconnection() throws sqlexception {
            try {
                if (connectionqueue == null){
                    connectionqueue = new linkedlist<connection>();
                    for (int i = 0;i < 3;i++){
                        connection conn = drivermanager.getconnection("jdbc:phoenix:hostname:2181");
    
                        connectionqueue.push(conn);
                    }
                }
            }catch (exception e1){
                e1.printstacktrace();
            }
            return connectionqueue.poll();
        }
    
        public static void returnconnection(connection conn){
            connectionqueue.push(conn);
        }

     

在sparkstreaming中引入phoenixutil类(由于业务关系,这里使用的是statement):

savelines.foreachrdd(rdd -> { rdd.foreachpartition(p -> { connection conn = phoenixutil.getconnection(); statement stmt = conn.createstatement(); conn.setautocommit(false); //业务逻辑 //sql } stmt.addbatch(sql); } stmt.executebatch(); conn.commit(); stmt.close(); phoenixutil.returnconnection(conn); zkkafkautil.updateoffset(offsetranges, group_id, topic); }); });

最后,如果大家有更好的方式处理这个问题,欢迎指教。