欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

记一次sparksql读取oracle数据优化过程

程序员文章站 2022-03-06 21:03:23
...

问题描述

Sparksql提供外接关系型数据库的接口如下,
def jdbc(
   		url: String,
    	table: String,
   	 	columnName: String,
   	 	lowerBound: Long,
   	 	upperBound: Long,
   		numPartitions: Int,
   		connectionProperties: Properties): DataFrame 

从上述接口可知,columnName列必须是整形类型,在现实使用场景中,并不一定总是存在整形列,且表中的整形列值如果不是分布均匀的话,很容易就会出现数据倾斜现象。
如果不使用上述接口,而是使用如下

def jdbc(url: String, table: String, properties: Properties): DataFrame

便存在性能问题,即仅有一个task去读取oracle中的数据

解决方法

Oracle存在两个伪列:rownum、rowid。伪劣,即表中不存在的列。Rownum是逻辑存在的、整形列,且是自增的。Rowid是物理存在的,其值是一串随机数,表征该记录的位置。
Rownum在select语句中,会对查询结果集进行编号,利用这个特点,我们可以利用sparksql jdbc接口去实现分布式多任务去读取oracle数据,具体思路如下:

  1. select max(rownum) ,min(rownum) from 表名1,查找最大***、最小***。
  2. 预定义一个task处理多少条记录,然后对上述的max、min区间进行等值划分,得出并行度,即上述接口中的numPartitions变量值。
  3. dataFrame = sqlContext.read.jdbc(url, select 表名1.*, rownum from 表名1, rownum, minValue, maxValue, numPartitions, prop)
  4. 丢弃“逻辑列”drop(rownum)