Spark数据不平衡特定领域的两个解决方案。
版权归本人及公司所有,转载请注明出处。
https://blog.csdn.net/aicodex/article/details/90297609
背景
最近在做推荐系统,其中要对数据做召回,需要快速为每个待推荐的“用户”召回100个左右的“电影”。此时一个简单的想法就是计算每个“用户”和“电影”的“相关度”,取top100。假设“用户”和“电影”都有标签。例如“科幻”,“动作”。那么在计算top100,可以只计算同key的相关度。
实现的时候很简单,假设你把用户数据读取好了,电影数据也读取好了
val user = sc.parallelize(Array(("科幻","小明"),("科幻","小红"),("剧情","小王"),("剧情","小李"),("剧情","小张")))
val movie = sc.parallelize(Array(("科幻","复联4"),("剧情","功夫"),("剧情","弱点")))
很自然想到的方法就是把他们按key求交。分别计算“用户”和“电影”的相关度。
val relevant = user.join(movie).map(e=>computeScore(e._2._1,e._2._2))
但是在这时你忽然发现喜欢剧情类电影的用户如此之多、剧情类的电影也如此多。最后导致了严重的数据倾斜。就像上述数据,join之后得到了。
val joinResult = user.join(movie).collect
joinResult: = Array((剧情,(小李,功夫)), (剧情,(小李,弱点)), (剧情,(小王,功夫)), (剧情,(小王,弱点)), (剧情,(小张,功夫)), (剧情,(小张,弱点)), (科幻,(小明,复联4)), (科幻,(小红,复联4)))
没错,join操作其实就是“按key做笛卡尔积”。看一下spark的join的实现就明白了,join操作本质上是做了两步cogroup和flatMap:
//spark的join源码
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
cogroup和groupByKey类似,就是cogroup的两个rdd按key做group操作,得到一个(key,valueArr)的元祖。这个flatMapValues就是按value铺平。key复制原来的。此时假如科幻电影是1000部,喜欢科幻的用户1000个。那么你会得到100w个pair。倘若剧情电影1w部,喜欢剧情电影的人1w个。那么你会得到1亿个pair。想想一下这些pair会出现在一个节点上(cogroup的时候同key一定会被hash到一个节点)。而且hash到哪个节点完全是随机的!!假设你的节点还挺强,1亿个还撑不爆你的节点,随机hash两个1亿全跑一个节点去了。就直接无法执行了,即使可以,也会因为结果太大spill到硬盘巨慢无比。
解决思路一
逻辑源于生活,高于生活。倘若给你一堆石头,有大的,有小的。让你尽可能平均的放到背包里。假如有两个石头非常大,你是否就应该把它尽可能别放到一个背包里,防止分配严重不均。这个问题是一个NP-Hard问题。一般称为K-Partition问题。自然而然想到的方法和维基上这个贪心算法很接近。我们把石头按重到轻依次放入目前最轻的背包。尽可能让他平均来解决数据倾斜。(绝对平均的代价太高了,NP-Hard问题意味着你不可能在多项式时间求解,只能暴力**)
我们这个实际问题比这个稍微复杂,因为在最终计算的过程中,得到的元素个数是“用户”数 x “电影”数。因此我们需要定义一个称重函数,使用者最了解自己的数据如何“称重”让他自己去称,我们要做的是按照“重量”,尽可能均分到每个节点上。话不多说直接上代码:
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
/*
* 自定义解决数据不平衡的分区器。通常数据不平衡经常出现在两个RDD求join、或一个RDD groupByKey的过程中。
* 而HashPartitioner是根据Key的hashCode对分区数取余,可以看做是随机分区。最后数据是按key均匀分布的。
* 但是每个key对应的value不一定均衡。尤其是join的时候,如果两个key对应的数据最后还要求笛卡尔积铺平的话,
* key对应的元素个数可能就是rdd1的key对应元素个数*rdd2的key对应的元素个数。也可能计算量是指数级甚至乘方级(例如求两两相似度,迭代梯度下降)。
* 因此需要根据"计算量"去尽可能平均划分分区,才是最优策略。当然在这个基础上,还是需要保证同key的分区是一个。
* rdd1是你输入的第一个rdd,valueWeightFunc1是对他key对应的值进行评估的函数(这个评估函数必须很快,否则还不如不用这个分区方式)
* rdd2是你输入的第二个rdd,valueWeightFunc2是对他key对应的值进行评估的函数
* mergeWeightFunc,是输入rdd1的key对应的weight和rdd2的key对应的weight。输出最终weight的函数。
* 该分区器会根据key对应的最终weight去分区,让每个分区的weight尽可能平均。
* 平均分配weight的任务像是一个经典问题:Partition problem.
* @see <a href="https://en.wikipedia.org/wiki/Partition_problem">https://en.wikipedia.org/wiki/Partition_problem</a>
* Partition problem是一个NP-Hard的问题,本代码的实现类似于该*的近似算法中的贪心算法。使用了一个小顶堆来实现快速查找最小总和的子集。
* 这里主要是考虑两个key相互影响的情况。rdd1,rdd2 key不相互重叠的部分,会采用HashPartitioner的方法。
*
*/
class KPartitioner[K, V, W](rdd1: RDD[(K, V)], rdd2: RDD[(K, W)], valueWeightFunc1: V => Int, valueWeightFunc2: W => Int, mergeWeightFunc: (Int, Int) => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) extends Partitioner {
val keyPartMap = {
val rdd1KeyWeight = rdd1.mapValues(valueWeightFunc1).reduceByKey(_ + _)
val rdd2KeyWeight = rdd2.mapValues(valueWeightFunc2).reduceByKey(_ + _)
val mergeKeyWeight = rdd1KeyWeight.join(rdd2KeyWeight).mapValues(e => mergeWeightFunc(e._1, e._2))
val keyCountArr = mergeKeyWeight.sortBy(_._2, false).collect()
val minHeap = new MinHeap(keyCountArr.take(partitions).map(e => (List(e._1), e._2.toLong)))
keyCountArr.slice(partitions, keyCountArr.length).foreach(e => minHeap.addKey(e._1, e._2))
minHeap.data.map(_._1).zipWithIndex.flatMap(e => e._1.map(f => (f, e._2))).toMap
}
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = keyPartMap.getOrElse(key.asInstanceOf[K], nonNegativeMod(key.hashCode, numPartitions))
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
class MinHeap(var data: Array[(List[K], Long)]) {
buildHeap()
private def buildHeap(): Unit = {
var i = data.length / 2 - 1
while (i >= 0) {
heapify(i)
i -= 1
}
}
private def heapify(i: Int): Unit = {
val l = left(i)
val r = right(i)
var smallest = i
if (l < data.length && data(l)._2 < data(i)._2) smallest = l
if (r < data.length && data(r)._2 < data(i)._2) smallest = r
if (i == smallest) return
swap(i, smallest)
heapify(smallest)
}
private def right(i: Int) = (i + 1) << 1
private def left(i: Int) = ((i + 1) << 1) - 1
private def swap(i: Int, j: Int): Unit = {
val tmp = data(i)
data(i) = data(j)
data(j) = tmp
}
def getRoot = data(0)
def setRoot(root: (List[K], Long)): Unit = {
data(0) = root
heapify(0)
}
def addKey(key: K, size: Long): Unit = {
val oldRoot = getRoot
setRoot((oldRoot._1 :+ key, oldRoot._2 + size))
}
}
}
object KPartitioner {
def apply[K, V, W](rdd1: RDD[(K, V)], rdd2: RDD[(K, W)], valueWeightFunc1: V => Int, valueWeightFunc2: W => Int, mergeWeightFunc: (Int, Int) => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = new KPartitioner[K, V, W](rdd1, rdd2, valueWeightFunc1, valueWeightFunc2, mergeWeightFunc, partitions)
def apply[K, V](rdd: RDD[(K, V)], valueWeightFunc: V => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V]) = new KPartitioner[K, V, V](rdd, rdd, valueWeightFunc, f => 0, (x, y) => x + y, partitions)
}
//测试代码
import xxx.KPartitioner
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test
@Test
class KPartitionerTest {
@Test
def getPartitionTest(): Unit = {
val conf = new SparkConf().setAppName("mySpark")
conf.setMaster("local")
val sc = new SparkContext(conf)
val inputList = List(("a", 4), ("b", 1), ("c", 3), ("d", 2), ("e", 7), ("f", 10), ("g", 20), ("h", 4), ("i", 5))
val inputMap = inputList.toMap
val testRDD = sc.parallelize(inputList)
val part = KPartitioner[String, Int](testRDD, f => f, 4)
//看每个分区分配的元素,以及合理性。期待20单独一个分区、10 2一个、7 4 1一个、5 4 3一个。
part.keyPartMap.toList.map(_.swap).groupBy(_._1).mapValues(_.map(e => (e._2, inputMap.getOrElse(e._2, -1)))).foreach(println)
}
}
代码中的小顶堆只是为了快速找到“最轻背包”的方法。代码中也附了简单的测试代码。只需要和hashPartitioner类似使用就行。join/cogroup/groupByKey的时候都可以用。最终效果是各个分区元素都尽可能平均了,但是计算这个较优分区,多付出了“称重”的时间代价。这个方法有一个缺陷就是,倘若有一个石头还是太大,你哪怕把它单独放在一个背包,他也过分倾斜,那么是没法用的。
解决思路二
还是我遇到的这个问题,我们注意到这样一件事:join是由cogroup+类似flatMap实现的。假如1w个喜欢剧情电影的人和1w部剧情电影。cogroup到一起也不过区区2w的数据量,这么点量很难导致数据不平衡。导致数据不平衡的根本原因是我们需要对他进行两两相似度计算。所以flatMap这一步操作又是几乎是必须的(即使你不flat也得计算1亿次相关度)。但是反过来,1亿条数据对于spark来说真不多,多的原因是把它分配到一个节点上去了。所以只要不让他在一个节点上就行。
说的轻巧,不在一个节点,我怎么保证一个key的数据还在一起呢?如果我在节点1上5000个人5000个电影,我如何让他知道节点2上的5000人和5000部电影呢。他们被隔断了,广播代价太高。其实思路说难也难,说简单也简单。举个简单的例子:
* [a,b,c,d] · [1,2] -> [(a,1)(a,2)(b,1)(b,2)(c,1)(c,2)(d,1)(e,2)]
* 直接铺平有8个字母,8个数字。而先按照左侧铺平,和右侧铺平分别为:
* (a,[1,2]) ([a,b,c,d],1)
* (c,[1,2]) ([a,b,c,d],2)
* (d,[1,2])
* (e,[1,2])
* 先铺平左侧。那么字母还是4个。数字变成8个。先铺平右侧字母变成8个数字是2个。
所以先铺平一侧,再把它利用repatition随机分发到各个节点上,就可以大大减小每个节点的数据量。当然,如果铺平一侧还是过大,我可以把铺平的一侧进行拆分再多铺平一次。话不多说直接上代码:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object RDDFunctionExtend {
implicit class PairRDDCovertor[K, V](val rdd: RDD[(K, V)]) extends AnyVal {
def cartesianByKeyWith[W](other: RDD[(K, W)], splitNumLeft: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
rdd.cogroup(other)
.flatMapValues(pair => pair._1.grouped(splitNumLeft).map(f => (f, pair._2)))
.repartition(rdd.sparkContext.defaultParallelism)
.flatMapValues(pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))
}
def cartesianByKeyWith[W](other: RDD[(K, W)], splitNumLeft: Int, splitNumRight: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
rdd.cogroup(other)
.flatMapValues(pair => for (v <- pair._1.grouped(splitNumLeft); w <- pair._2.grouped(splitNumRight)) yield (v, w))
.repartition(rdd.sparkContext.defaultParallelism)
.flatMapValues(pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))
}
def cartesianByKeyWith[W](other: RDD[(K, W)])(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
rdd.cogroup(other)
.flatMapValues(e => e._1.map(f => (f, e._2)))
.repartition(rdd.sparkContext.defaultParallelism)
.flatMapValues(e => e._2.map(f => (e._1, f)))
}
}
}
我定义了这么一个“隐式转换”能吧pairRDD转化成我定义的类,用的时候只需要import这个隐式类即可。这样给pairRDD拓展了一个cartesianByKeyWith接口,可以让他铺平一侧,或者是拆分了再铺平。来替代join。如果需要leftOuterJoin或者rightOuterJoin或是fullOuterJoin的,可以自己照葫芦画瓢实现一个。
上一篇: 两个错误的解决方案
下一篇: React hooks实践