记一次sparksql读取oracle数据优化过程
程序员文章站
2022-03-06 21:03:23
...
问题描述
Sparksql提供外接关系型数据库的接口如下,
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
从上述接口可知,columnName列必须是整形类型,在现实使用场景中,并不一定总是存在整形列,且表中的整形列值如果不是分布均匀的话,很容易就会出现数据倾斜现象。
如果不使用上述接口,而是使用如下
def jdbc(url: String, table: String, properties: Properties): DataFrame
便存在性能问题,即仅有一个task去读取oracle中的数据
解决方法
Oracle存在两个伪列:rownum、rowid。伪劣,即表中不存在的列。Rownum是逻辑存在的、整形列,且是自增的。Rowid是物理存在的,其值是一串随机数,表征该记录的位置。
Rownum在select语句中,会对查询结果集进行编号,利用这个特点,我们可以利用sparksql jdbc接口去实现分布式多任务去读取oracle数据,具体思路如下:
- select max(rownum) ,min(rownum) from
表名1
,查找最大***、最小***。 - 预定义一个task处理多少条记录,然后对上述的max、min区间进行等值划分,得出并行度,即上述接口中的numPartitions变量值。
- dataFrame = sqlContext.read.jdbc(url, select
表名1
.*, rownum from表名1
, rownum, minValue, maxValue, numPartitions, prop) - 丢弃“逻辑列”drop(rownum)