欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark GraphX中的pregel 函数(步骤图解)

程序员文章站 2022-07-14 19:33:37
...

spark 系列

Spark 核心原理及运行架构

Spark RDD详解

Spark 常用算子大全

Spark SQL 详解

Spark GraphX 图计算入门基础

Spark PageRank 算法

Spark GraphX中的pregel API




前言

在上一篇博客已经为大家介绍了Spark GraphX图计算中的PageRank 算法。本篇博客将为大家详细介绍了 Spark GraphX中常用的函数pregel实现最短路径或者最小值的实现原理和方式


pregel API

概述

图本质上是一种递归的数据结构,其顶点的属性值依赖于其邻接顶点,而其邻接顶点属性又依赖于其邻接顶点,许多重要的图算法通过迭代计算每个顶点的属性直到到达定点条件,这些迭代的图算法被抽象成一系列图并行操作。

Pregel是Google提出的用于大规模分布式图计算框架,常用来解决一下问题:

  • 图遍历(BFS)
  • 单源最短路径(SSSP)
  • PageRank计算(上一篇已经介绍过)

Pregel的计算由一系列迭代组成,称为supersteps。Pregel迭代过程(实现过程)如下:

  • 每个顶点从上一个superstep接收入站消息
  • 计算顶点新的属性值
  • 在下一个superstep中向相邻的顶点发送消息
  • 当没有剩余消息时,迭代结束

源码参数分析

源码

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

相关参数

相关参数 说明
VD 顶点的数据类型
ED 边的数据类型
A Pregel message的类型
graph 输入的图

参数

参数 说明
initialMsg 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
maxIterations 最大迭代次数
activeDirection 规定了发送消息的方向(默认是出边方向:EdgeDirection.Out)
vprog 节点接收该消息并将聚合后的数据和本节点进行属性的合并
sendMsg **态的节点调用该方法发送消息
mergeMsg 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

关联常用GraphX 的API

常用GraphX 的API 说明
mapReduceTriplets() 计算每个节点的相邻的边缘和顶点的值,用户定义的mapFunc函数会在图的每一条边调用,产生0或者多个message发送到这条边两个顶点其中一个当中,reduceFunc函数用来合并map阶段的输出到每个节点

案例

在理解案例之前,首先要清楚关于 顶点 的两点知识:

顶点 的状态有两种:

  1. 钝化态【类似于休眠,不做任何事】
  2. **态【干活】

顶点 能够处于**态需要满足以下任意条件:

  1. 成功收到消息
  2. 成功发送了任何一条消息

案例一:求最短距离

顶点5到其他各顶点的最短距离 实现代码

object Test{
  def main(args: Array[String]): Unit = {
    //1、创建SparkContext
    val sparkConf = new SparkConf().setAppName("GraphxHelloWorld").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    //2、创建顶点
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD: RDD[(VertexId, (String,Int))] = sparkContext.makeRDD(vertexArray)

    //3、创建边,边的属性代表 相邻两个顶点之间的距离
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(2L, 5L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD: RDD[Edge[Int]] = sparkContext.makeRDD(edgeArray)


    //4、创建图(使用aply方式创建)
    val graph1 = Graph(vertexRDD, edgeRDD)

    /* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** */
    //被计算的图中 起始顶点id
    val srcVertexId = 5L
    val initialGraph = graph1.mapVertices{case (vid,(name,age)) => if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}

    //5、调用pregel
    val pregelGraph = initialGraph.pregel(
      Double.PositiveInfinity,
      Int.MaxValue,
      EdgeDirection.Out
    )(
      (vid: VertexId, vd: Double, distMsg: Double) => {
        val minDist = math.min(vd, distMsg)
        println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")
        minDist
      },
      (edgeTriplet: EdgeTriplet[Double,PartitionID]) => {
        if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}成功")
          Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
        } else {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}失败")
          Iterator.empty
        }
      },
      (msg1: Double, msg2: Double) => math.min(msg1, msg2)
    )

    //6、输出结果
//      pregelGraph.triplets.collect().foreach(println)
//      println(pregelGraph.vertices.collect.mkString("\n"))

    //7、关闭SparkContext
    sparkContext.stop()
  }
}

输出结果:

//------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------
顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
//------------------------------------------ 第一次迭代 ------------------------------------------
顶点3 给 顶点6 发送消息 Infinity失败
顶点5 给 顶点6 发送消息 3.0成功
顶点2 给 顶点4 发送消息 Infinity失败
顶点4 给 顶点1 发送消息 Infinity失败
顶点5 给 顶点3 发送消息 8.0成功
顶点2 给 顶点1 发送消息 Infinity失败
顶点2 给 顶点5 发送消息 Infinity失败
顶点3 给 顶点2 发送消息 Infinity失败

顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
顶点6,属性Infinity,收到消息3.0,合并后的属性3.0
//------------------------------------------ 第二次迭代 ------------------------------------------
顶点3 给 顶点2 发送消息 12.0成功
顶点3 给 顶点6 发送消息 11.0失败

顶点2,属性Infinity,收到消息12.0,合并后的属性12.0
//------------------------------------------ 第三次迭代 ------------------------------------------
顶点2 给 顶点1 发送消息 19.0成功
顶点2 给 顶点4 发送消息 14.0成功
顶点2 给 顶点5 发送消息 14.0失败

顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
//------------------------------------------ 第四次迭代 ------------------------------------------
顶点4 给 顶点1 发送消息 15.0成功

顶点1,属性19.0,收到消息15.0,合并后的属性15.0
//------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------

pregel 最短路径过程分析

调用pregel方法之前,先把图的各个顶点的属性初始化为如下图所示:顶点5到自己的距离为0,所以设为0,其他顶点都设为 正无穷大Double.PositiveInfinity。

1. 当调用pregel方法开始:

首先,所有顶点都将接收到一条初始消息initialMsg,使所有顶点都处于**态(红色标识的节点)。
Spark GraphX中的pregel 函数(步骤图解)
2. 第一次迭代开始

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。
发送成功的只有两条边:
5—>3(0+8<Double.Infinity , 成功),
5—>6(0+3<Double.Infinity , 成功)
3—>2(Double.Infinity+4>Double.Infinity , 失败)
3—>6(Double.Infinity+3>Double.Infinity , 失败)
2—>1(Double.Infinity+7>Double.Infinity , 失败)
2—>4(Double.Infinity+2>Double.Infinity , 失败)
2—>5(Double.Infinity+2>Double.Infinity , 失败)
4—>1(Double.Infinity+1>Double.Infinity , 失败)。

sendMsg方法执行完成之后,根据顶点处于**态的条件,顶点5 成功地分别给顶点3 和 顶点6 发送了消息,顶点3 和 顶点6 也成功地接受到了消息。所以 此时只有5,3,6 三个顶点处于**态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息 与 自身的属性合并。如下图所示。到此第一次迭代结束。

3. 第二次迭代开始:

顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为**状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 下图所示至此第二次迭代结束。
Spark GraphX中的pregel 函数(步骤图解)
4. 第三次迭代开始:

顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为**状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。下图所示至此第三次迭代结束。
Spark GraphX中的pregel 函数(步骤图解)
5. 第四次迭代开始:

顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入**状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并 。
Spark GraphX中的pregel 函数(步骤图解)
6. 第五次迭代开始:

顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束。
Spark GraphX中的pregel 函数(步骤图解)

案例二:求出图中最小值

实现代码:

object MyPregel {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("pregel").getOrCreate()
    val sc = spark.sparkContext

	// 初始化点、边、图
    val vertices = sc.parallelize(Array((1L,(7,-1)),(2L,(3,-1)),(3L,(2,-1)),(4L,(6,-1))))
    val edges = sc.parallelize(Array(Edge(1L,2L,true),Edge(1L,4L,true),Edge(2L,4L,true),Edge(3L,1L,true),Edge(3L,4L,true)))
    val graph = Graph(vertices,edges)
	
	// 初始化消息为比最大值大的数,用来求最小值,同时自定义函数vprog、sendMsg和margeMsg
    val initMsg = 9999
    def vprog(vid:VertexId,value:(Int,Int),msg:Int)={
      if (msg == initMsg) value else (msg min value._1,value._1)
    }
    def sendMsg(triplet:EdgeTriplet[(Int,Int),Boolean])={
      val src = triplet.srcAttr
      if (src._1 == src._2) Iterator.empty else Iterator((triplet.dstId,src._1))
    }
    def margeMsg(msg1:Int,msg2:Int) = {
      msg1 min msg2
    }

	// 调用pregel 函数求解最小值
    val minGraph = graph.pregel(initMsg,Int.MaxValue,EdgeDirection.Out)(vprog,sendMsg,margeMsg)
    minGraph.vertices.collect().foreach(println)

    spark.stop()
  }
}

执行流程如下:

1. 初始化图
Spark GraphX中的pregel 函数(步骤图解)初始化构建点、边和图,确认节点顶点钝化或**状态(一般开始时必然都是**状态),**状态下按边的方向发送信息。
Spark GraphX中的pregel 函数(步骤图解)
初始化vprog方法每个顶点原先的值为-1(小于顶点最小值),所有活的顶点开始发送信息给所在边的另一顶点。

2. 第一次迭代过程
Spark GraphX中的pregel 函数(步骤图解)
图中绿色表示顶点为**状态,红的表示顶点为钝化状态,经第一次迭代后发现,顶点2(2)和3(3)为钝化状态;2(7)和2(6)为**状态,需要继续迭代执行。

3. 第二次迭代过程
Spark GraphX中的pregel 函数(步骤图解)
经第二次迭代后发现,3个顶点2(2)为钝化状态;2(3)为**状态,需要继续迭代执行。

4. 第三次迭代过程

Spark GraphX中的pregel 函数(步骤图解)
经第三次迭代后发现,4个顶点2(2)为钝化状态;迭代执行结束。因此图中最小值为2,这当然也是非常明显的。