欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark-sql应用

程序员文章站 2022-06-07 23:12:17
...

一.序言

     这里介绍一下我们对spark-sql 的一些简单应用。

  

二.业务描述

     我们需要对大量数据进行分析,包含历史的数据,传统数据库即使用分库分表的中间件,也只能满足基本查询,对于多表关连的分析是不太友好的,因此我们将数据放到hadoop集群,但是并不是所有的JAVA 程序员都能对hadoop 或者hive 进行开发,为了屏蔽这种差异,因此我们在几个分析软件上做了对比,这里就不介绍了,最后选择了spark。

 

三.开发流程

     1.服务器上我们启动了hive 的metastore 信息,让spark 使用。

 

      2.开发了简单的jar 进行使用,大概有以下类:

   

public class SparkContextWrapper {
    /**
     * sc 对象
     */
    private  JavaSparkContext sparkContext;
    /**
     * hive sql 查询
     */
    private  HiveContext hiveContext;
    /**
     * 配置信息
     */
    private SparkConf conf;
    // 初始化spark
    private synchronized void initJavaSparkContext(){...}
    // 初始化hive
    private synchronized HiveContext initHiveContext(){...}
   
    // 其他环境检查  关闭 重启 等方法  

}

    

   2.2 类似于sqlTemplate 模板

   

@Component
public class SqlContextWrapper {
    // 获取对象
    @Autowired
    SparkContextWrapper sparkContextWrapper;
   
    // 执行sql
    public DataFrame sql(String sql){}

    //  缓存表
    public DataFrame cacheTable(String sql,String tableName){}
   
    // 清空表,返回结果集 等等方法

}

 

    2.3 事务控制,支持关闭 和长连接的方式。

   

/**
 * spark 事务管理器
 * 提供初始化和stop 方法
 * Created by qqr on 15/12/18.
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface SparkHiveSqlTransactional {
 /**
     * 默认延迟关闭时间
     * @return
     */
    int delayTime() default 5;

    /**
     * 是否延迟关闭
     * @return
     */
    boolean isDelayStop() default true;
}

    

    2.4 事务拦截器

   

/**
 * spring 事务管理器
 * Created by qqr on 15/12/18.
 */
@Component
@Aspect
@Log4j
public class SparkTransactionManager {
    
    @Autowired
    private SparkContextWrapper sparkContextWrapper;

    // 监控
    @Autowired
    private MonitorUtil monitorUtil;

    @Around("@annotation(com.xxx.SparkHiveSqlTransactional)")
    @Transactional()
    private Object sparkTx(ProceedingJoinPoint pj) throws Throwable {
    ...
}
    

 

    2.5 还包含监控,异常,任务管理等等就不贴了。

 

 

三.使用配置

    3.1 用spring 注册环境变量

    

<!-- spring 需要的参数,居然参数解释 看文档 -->
<bean id="evnInit" class="com.xxx.EnvInit">
        <property name="master" value="${spark.master}"/>
        <property name="appName" value="${spark.app.name}"/>
        <property name="sparkConfPrams">
            <map>
                <entry key="spark.executor.memory" value="1G" />
                <entry key="spark.cores.max" value="4" />
                <entry key="spark.worker.cleanup.enabled" value="true" />
                <entry key="spark.worker.cleanup.interval" value="120" />
                <entry key="spark.akka.frameSize" value="256" />
                <entry key="spark.scheduler.mode" value="FAIR" />
            </map>
        </property>
        <property name="sparkMonitor" ref="sparkMonitor" />
        <property name="debugModel" value="${debug.model}"/>
    </bean>

 

    3.2 使用

   

@Scheduled(cron = "0 15 07 01 * ?")
    @SparkHiveSqlTransactional
    public void autoRun() {
        // 执行任务
        runTask();
    }

 

    

  private void runTask(){
        StringBuilder sql = new StringBuilder("select * from test limit 1");
         // 继承一个 baseDao
         List<Row> rowList = sql(sql.toString());
         // 返回值想干嘛干嘛
           
        }
    }

 

 

四.小结

     1.关于spark 的配置已经很多了,这里不介绍。

     2.这样能让java 程序员直接对大数据进行操作,还是比较方便的,当然需要对他们的SQL 进行一些监控,毕竟效率需要考虑的,监控可以按需求开发

     3.任务和直接接入任务调度。

 

     4.问题:

      4.1    开始为了节约资源,每次跑完了,一定时间内没有其他任务 就把sc 关闭了,这样会导致perm 区很多,注意内存。

      4.2   使用长连接可以解决上面问题,但是有些浪费资源。服务器的最好是8G,我们是4G,也够了,spark 的自己看着办吧。

      4.3 最后是长连接和断开的方式混用的模式,spark挂了,重启可以重新跑

      4.4 sparkUI  是很占perm 区,特别是短链接的时候,可以改小的,默认是1000

      4.5 因为有RDD 和内存表的概念,比hive  快很多的

最后:这是分享的一简单经验,不好的地方指出,非常感谢

    

相关标签: saprk spark-sql