spark-sql应用
一.序言
这里介绍一下我们对spark-sql 的一些简单应用。
二.业务描述
我们需要对大量数据进行分析,包含历史的数据,传统数据库即使用分库分表的中间件,也只能满足基本查询,对于多表关连的分析是不太友好的,因此我们将数据放到hadoop集群,但是并不是所有的JAVA 程序员都能对hadoop 或者hive 进行开发,为了屏蔽这种差异,因此我们在几个分析软件上做了对比,这里就不介绍了,最后选择了spark。
三.开发流程
1.服务器上我们启动了hive 的metastore 信息,让spark 使用。
2.开发了简单的jar 进行使用,大概有以下类:
public class SparkContextWrapper { /** * sc 对象 */ private JavaSparkContext sparkContext; /** * hive sql 查询 */ private HiveContext hiveContext; /** * 配置信息 */ private SparkConf conf; // 初始化spark private synchronized void initJavaSparkContext(){...} // 初始化hive private synchronized HiveContext initHiveContext(){...} // 其他环境检查 关闭 重启 等方法 }
2.2 类似于sqlTemplate 模板
@Component public class SqlContextWrapper { // 获取对象 @Autowired SparkContextWrapper sparkContextWrapper; // 执行sql public DataFrame sql(String sql){} // 缓存表 public DataFrame cacheTable(String sql,String tableName){} // 清空表,返回结果集 等等方法 }
2.3 事务控制,支持关闭 和长连接的方式。
/** * spark 事务管理器 * 提供初始化和stop 方法 * Created by qqr on 15/12/18. */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface SparkHiveSqlTransactional { /** * 默认延迟关闭时间 * @return */ int delayTime() default 5; /** * 是否延迟关闭 * @return */ boolean isDelayStop() default true; }
2.4 事务拦截器
/** * spring 事务管理器 * Created by qqr on 15/12/18. */ @Component @Aspect @Log4j public class SparkTransactionManager { @Autowired private SparkContextWrapper sparkContextWrapper; // 监控 @Autowired private MonitorUtil monitorUtil; @Around("@annotation(com.xxx.SparkHiveSqlTransactional)") @Transactional() private Object sparkTx(ProceedingJoinPoint pj) throws Throwable { ... }
2.5 还包含监控,异常,任务管理等等就不贴了。
三.使用配置
3.1 用spring 注册环境变量
<!-- spring 需要的参数,居然参数解释 看文档 --> <bean id="evnInit" class="com.xxx.EnvInit"> <property name="master" value="${spark.master}"/> <property name="appName" value="${spark.app.name}"/> <property name="sparkConfPrams"> <map> <entry key="spark.executor.memory" value="1G" /> <entry key="spark.cores.max" value="4" /> <entry key="spark.worker.cleanup.enabled" value="true" /> <entry key="spark.worker.cleanup.interval" value="120" /> <entry key="spark.akka.frameSize" value="256" /> <entry key="spark.scheduler.mode" value="FAIR" /> </map> </property> <property name="sparkMonitor" ref="sparkMonitor" /> <property name="debugModel" value="${debug.model}"/> </bean>
3.2 使用
@Scheduled(cron = "0 15 07 01 * ?") @SparkHiveSqlTransactional public void autoRun() { // 执行任务 runTask(); }
private void runTask(){ StringBuilder sql = new StringBuilder("select * from test limit 1"); // 继承一个 baseDao List<Row> rowList = sql(sql.toString()); // 返回值想干嘛干嘛 } }
四.小结
1.关于spark 的配置已经很多了,这里不介绍。
2.这样能让java 程序员直接对大数据进行操作,还是比较方便的,当然需要对他们的SQL 进行一些监控,毕竟效率需要考虑的,监控可以按需求开发
3.任务和直接接入任务调度。
4.问题:
4.1 开始为了节约资源,每次跑完了,一定时间内没有其他任务 就把sc 关闭了,这样会导致perm 区很多,注意内存。
4.2 使用长连接可以解决上面问题,但是有些浪费资源。服务器的最好是8G,我们是4G,也够了,spark 的自己看着办吧。
4.3 最后是长连接和断开的方式混用的模式,spark挂了,重启可以重新跑
4.4 sparkUI 是很占perm 区,特别是短链接的时候,可以改小的,默认是1000
4.5 因为有RDD 和内存表的概念,比hive 快很多的
最后:这是分享的一简单经验,不好的地方指出,非常感谢