欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark数据不平衡特定领域的两个解决方案。

程序员文章站 2022-04-15 13:37:13
...

版权归本人及公司所有,转载请注明出处。
https://blog.csdn.net/aicodex/article/details/90297609

背景

  最近在做推荐系统,其中要对数据做召回,需要快速为每个待推荐的“用户”召回100个左右的“电影”。此时一个简单的想法就是计算每个“用户”和“电影”的“相关度”,取top100。假设“用户”和“电影”都有标签。例如“科幻”,“动作”。那么在计算top100,可以只计算同key的相关度。
  实现的时候很简单,假设你把用户数据读取好了,电影数据也读取好了

val user = sc.parallelize(Array(("科幻","小明"),("科幻","小红"),("剧情","小王"),("剧情","小李"),("剧情","小张")))
val movie = sc.parallelize(Array(("科幻","复联4"),("剧情","功夫"),("剧情","弱点")))

  很自然想到的方法就是把他们按key求交。分别计算“用户”和“电影”的相关度。

val relevant = user.join(movie).map(e=>computeScore(e._2._1,e._2._2))

  但是在这时你忽然发现喜欢剧情类电影的用户如此之多、剧情类的电影也如此多。最后导致了严重的数据倾斜。就像上述数据,join之后得到了。

val joinResult = user.join(movie).collect
joinResult: = Array((剧情,(小李,功夫)), (剧情,(小李,弱点)), (剧情,(小王,功夫)), (剧情,(小王,弱点)), (剧情,(小张,功夫)), (剧情,(小张,弱点)), (科幻,(小明,复联4)), (科幻,(小红,复联4)))

  没错,join操作其实就是“按key做笛卡尔积”。看一下spark的join的实现就明白了,join操作本质上是做了两步cogroup和flatMap:

  //spark的join源码
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }

  cogroup和groupByKey类似,就是cogroup的两个rdd按key做group操作,得到一个(key,valueArr)的元祖。这个flatMapValues就是按value铺平。key复制原来的。此时假如科幻电影是1000部,喜欢科幻的用户1000个。那么你会得到100w个pair。倘若剧情电影1w部,喜欢剧情电影的人1w个。那么你会得到1亿个pair。想想一下这些pair会出现在一个节点上(cogroup的时候同key一定会被hash到一个节点)。而且hash到哪个节点完全是随机的!!假设你的节点还挺强,1亿个还撑不爆你的节点,随机hash两个1亿全跑一个节点去了。就直接无法执行了,即使可以,也会因为结果太大spill到硬盘巨慢无比。

解决思路一

  逻辑源于生活,高于生活。倘若给你一堆石头,有大的,有小的。让你尽可能平均的放到背包里。假如有两个石头非常大,你是否就应该把它尽可能别放到一个背包里,防止分配严重不均。这个问题是一个NP-Hard问题。一般称为K-Partition问题。自然而然想到的方法和维基上这个贪心算法很接近。我们把石头按重到轻依次放入目前最轻的背包。尽可能让他平均来解决数据倾斜。(绝对平均的代价太高了,NP-Hard问题意味着你不可能在多项式时间求解,只能暴力**)
  我们这个实际问题比这个稍微复杂,因为在最终计算的过程中,得到的元素个数是“用户”数 x “电影”数。因此我们需要定义一个称重函数,使用者最了解自己的数据如何“称重”让他自己去称,我们要做的是按照“重量”,尽可能均分到每个节点上。话不多说直接上代码:

import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
/*
 * 自定义解决数据不平衡的分区器。通常数据不平衡经常出现在两个RDD求join、或一个RDD groupByKey的过程中。
 * 而HashPartitioner是根据Key的hashCode对分区数取余,可以看做是随机分区。最后数据是按key均匀分布的。
 * 但是每个key对应的value不一定均衡。尤其是join的时候,如果两个key对应的数据最后还要求笛卡尔积铺平的话,
 * key对应的元素个数可能就是rdd1的key对应元素个数*rdd2的key对应的元素个数。也可能计算量是指数级甚至乘方级(例如求两两相似度,迭代梯度下降)。
 * 因此需要根据"计算量"去尽可能平均划分分区,才是最优策略。当然在这个基础上,还是需要保证同key的分区是一个。
 * rdd1是你输入的第一个rdd,valueWeightFunc1是对他key对应的值进行评估的函数(这个评估函数必须很快,否则还不如不用这个分区方式)
 * rdd2是你输入的第二个rdd,valueWeightFunc2是对他key对应的值进行评估的函数
 * mergeWeightFunc,是输入rdd1的key对应的weight和rdd2的key对应的weight。输出最终weight的函数。
 * 该分区器会根据key对应的最终weight去分区,让每个分区的weight尽可能平均。
 * 平均分配weight的任务像是一个经典问题:Partition problem.
 * @see <a href="https://en.wikipedia.org/wiki/Partition_problem">https://en.wikipedia.org/wiki/Partition_problem</a>
 * Partition problem是一个NP-Hard的问题,本代码的实现类似于该*的近似算法中的贪心算法。使用了一个小顶堆来实现快速查找最小总和的子集。
 * 这里主要是考虑两个key相互影响的情况。rdd1,rdd2 key不相互重叠的部分,会采用HashPartitioner的方法。
 * 
 */
class KPartitioner[K, V, W](rdd1: RDD[(K, V)], rdd2: RDD[(K, W)], valueWeightFunc1: V => Int, valueWeightFunc2: W => Int, mergeWeightFunc: (Int, Int) => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) extends Partitioner {
  val keyPartMap = {
    val rdd1KeyWeight = rdd1.mapValues(valueWeightFunc1).reduceByKey(_ + _)
    val rdd2KeyWeight = rdd2.mapValues(valueWeightFunc2).reduceByKey(_ + _)
    val mergeKeyWeight = rdd1KeyWeight.join(rdd2KeyWeight).mapValues(e => mergeWeightFunc(e._1, e._2))
    val keyCountArr = mergeKeyWeight.sortBy(_._2, false).collect()
    val minHeap = new MinHeap(keyCountArr.take(partitions).map(e => (List(e._1), e._2.toLong)))
    keyCountArr.slice(partitions, keyCountArr.length).foreach(e => minHeap.addKey(e._1, e._2))
    minHeap.data.map(_._1).zipWithIndex.flatMap(e => e._1.map(f => (f, e._2))).toMap
  }
  override def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = keyPartMap.getOrElse(key.asInstanceOf[K], nonNegativeMod(key.hashCode, numPartitions))
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }
  class MinHeap(var data: Array[(List[K], Long)]) {
    buildHeap()
    private def buildHeap(): Unit = {
      var i = data.length / 2 - 1
      while (i >= 0) {
        heapify(i)
        i -= 1
      }
    }
    private def heapify(i: Int): Unit = {
      val l = left(i)
      val r = right(i)
      var smallest = i
      if (l < data.length && data(l)._2 < data(i)._2) smallest = l
      if (r < data.length && data(r)._2 < data(i)._2) smallest = r
      if (i == smallest) return
      swap(i, smallest)
      heapify(smallest)
    }
    private def right(i: Int) = (i + 1) << 1
    private def left(i: Int) = ((i + 1) << 1) - 1
    private def swap(i: Int, j: Int): Unit = {
      val tmp = data(i)
      data(i) = data(j)
      data(j) = tmp
    }
    def getRoot = data(0)
    def setRoot(root: (List[K], Long)): Unit = {
      data(0) = root
      heapify(0)
    }
    def addKey(key: K, size: Long): Unit = {
      val oldRoot = getRoot
      setRoot((oldRoot._1 :+ key, oldRoot._2 + size))
    }
  }
}
object KPartitioner {
  def apply[K, V, W](rdd1: RDD[(K, V)], rdd2: RDD[(K, W)], valueWeightFunc1: V => Int, valueWeightFunc2: W => Int, mergeWeightFunc: (Int, Int) => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = new KPartitioner[K, V, W](rdd1, rdd2, valueWeightFunc1, valueWeightFunc2, mergeWeightFunc, partitions)

  def apply[K, V](rdd: RDD[(K, V)], valueWeightFunc: V => Int, partitions: Int)(implicit kt: ClassTag[K], vt: ClassTag[V]) = new KPartitioner[K, V, V](rdd, rdd, valueWeightFunc, f => 0, (x, y) => x + y, partitions)
}
//测试代码
import xxx.KPartitioner
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test
@Test
class KPartitionerTest {
  @Test
  def getPartitionTest(): Unit = {
    val conf = new SparkConf().setAppName("mySpark")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val inputList = List(("a", 4), ("b", 1), ("c", 3), ("d", 2), ("e", 7), ("f", 10), ("g", 20), ("h", 4), ("i", 5))
    val inputMap = inputList.toMap
    val testRDD = sc.parallelize(inputList)
    val part = KPartitioner[String, Int](testRDD, f => f, 4)
    //看每个分区分配的元素,以及合理性。期待20单独一个分区、10 2一个、7 4 1一个、5 4 3一个。
    part.keyPartMap.toList.map(_.swap).groupBy(_._1).mapValues(_.map(e => (e._2, inputMap.getOrElse(e._2, -1)))).foreach(println)
  }
}

  代码中的小顶堆只是为了快速找到“最轻背包”的方法。代码中也附了简单的测试代码。只需要和hashPartitioner类似使用就行。join/cogroup/groupByKey的时候都可以用。最终效果是各个分区元素都尽可能平均了,但是计算这个较优分区,多付出了“称重”的时间代价。这个方法有一个缺陷就是,倘若有一个石头还是太大,你哪怕把它单独放在一个背包,他也过分倾斜,那么是没法用的。

解决思路二

  还是我遇到的这个问题,我们注意到这样一件事:join是由cogroup+类似flatMap实现的。假如1w个喜欢剧情电影的人和1w部剧情电影。cogroup到一起也不过区区2w的数据量,这么点量很难导致数据不平衡。导致数据不平衡的根本原因是我们需要对他进行两两相似度计算。所以flatMap这一步操作又是几乎是必须的(即使你不flat也得计算1亿次相关度)。但是反过来,1亿条数据对于spark来说真不多,多的原因是把它分配到一个节点上去了。所以只要不让他在一个节点上就行。
  说的轻巧,不在一个节点,我怎么保证一个key的数据还在一起呢?如果我在节点1上5000个人5000个电影,我如何让他知道节点2上的5000人和5000部电影呢。他们被隔断了,广播代价太高。其实思路说难也难,说简单也简单。举个简单的例子:

     * [a,b,c,d] · [1,2] -> [(a,1)(a,2)(b,1)(b,2)(c,1)(c,2)(d,1)(e,2)]
     * 直接铺平有8个字母,8个数字。而先按照左侧铺平,和右侧铺平分别为:
     * (a,[1,2])      ([a,b,c,d],1)
     * (c,[1,2])      ([a,b,c,d],2)
     * (d,[1,2])
     * (e,[1,2])
     * 先铺平左侧。那么字母还是4个。数字变成8个。先铺平右侧字母变成8个数字是2个。

  所以先铺平一侧,再把它利用repatition随机分发到各个节点上,就可以大大减小每个节点的数据量。当然,如果铺平一侧还是过大,我可以把铺平的一侧进行拆分再多铺平一次。话不多说直接上代码:

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object RDDFunctionExtend {
  implicit class PairRDDCovertor[K, V](val rdd: RDD[(K, V)]) extends AnyVal {
    def cartesianByKeyWith[W](other: RDD[(K, W)], splitNumLeft: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
      rdd.cogroup(other)
        .flatMapValues(pair => pair._1.grouped(splitNumLeft).map(f => (f, pair._2)))
        .repartition(rdd.sparkContext.defaultParallelism)
        .flatMapValues(pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))
    }
    def cartesianByKeyWith[W](other: RDD[(K, W)], splitNumLeft: Int, splitNumRight: Int)(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
      rdd.cogroup(other)
        .flatMapValues(pair => for (v <- pair._1.grouped(splitNumLeft); w <- pair._2.grouped(splitNumRight)) yield (v, w))
        .repartition(rdd.sparkContext.defaultParallelism)
        .flatMapValues(pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))
    }
    def cartesianByKeyWith[W](other: RDD[(K, W)])(implicit kt: ClassTag[K], vt: ClassTag[V], wt: ClassTag[W]) = {
      rdd.cogroup(other)
        .flatMapValues(e => e._1.map(f => (f, e._2)))
        .repartition(rdd.sparkContext.defaultParallelism)
        .flatMapValues(e => e._2.map(f => (e._1, f)))
    }
  }
}

  我定义了这么一个“隐式转换”能吧pairRDD转化成我定义的类,用的时候只需要import这个隐式类即可。这样给pairRDD拓展了一个cartesianByKeyWith接口,可以让他铺平一侧,或者是拆分了再铺平。来替代join。如果需要leftOuterJoin或者rightOuterJoin或是fullOuterJoin的,可以自己照葫芦画瓢实现一个。