欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

知识图谱学习笔记-图操作

程序员文章站 2022-06-12 17:08:00
...

一、自定义图

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph

object PropertiesGraph {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("graph").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("OFF")

    //定义点
    val users:RDD[(VertexId,(String,String))] = sc.parallelize(
      Array(
        (3L,("s1","student")),
        (6L,("s2","teacher")),
        (5L,("s3","pro")),
        (2L,("s4","pro"))
      )
    )

    //定义边
    val relationships:RDD[Edge[String]] = sc.parallelize(
      Array(
        Edge(3L,6L,"cooperate"),
        Edge(5L,6L,"agvisor"),
        Edge(2L,5L,"colleague")
      )
    )

    val defaultUser = ("Jack ma","defaultUser")
    val graph = Graph(users,relationships,defaultUser)


  }


}

二、图基本操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph

object PropertiesGraph {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("graph").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("OFF")

    //定义点
    val users:RDD[(VertexId,(String,String))] = sc.parallelize(
      Array(
        (3L,("s1","student")),
        (6L,("s2","teacher")),
        (5L,("s3","pro")),
        (2L,("s4","pro"))
      )
    )

    //定义边
    val relationships:RDD[Edge[String]] = sc.parallelize(
      Array(
        Edge(3L,6L,"cooperate"),
        Edge(5L,6L,"advisor"),
        Edge(2L,5L,"colleague")
      )
    )

    val defaultUser = ("Jack ma","defaultUser")
    val graph = Graph(users,relationships,defaultUser)


    /**********************  图操作  ***************************/
    
    //找到途中属性是student的点
    graph.vertices.filter{case (id,(name,work))=>work=="student"}
      .collect.foreach{case (id,(name,work))=>println(s"$name is $work")}

    //找到途中属性为advisor的边
    graph.edges.filter(x => x.attr=="advisor")
      .collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))

    //出度和入度操作
    def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int) = {
      if(a._2 > b._2) a else b
    }

    println("最大的出度" + graph.outDegrees.reduce(max))
    println("最大的入度" + graph.inDegrees.reduce(max))
    println("最大的度数" + graph.degrees.reduce(max))


    // 给每一个点的职业属性上加一个字符串
    graph.mapVertices{case (id,(name,work))=>(id,(name,work+"_spark"))}
      .vertices.collect.foreach(println)

    // triplet:a关系b  a -> b
    // 给图中每个元组的Edge的属性值设置为源的属性值+边的属性值+目标点的属性值
    graph.mapTriplets(x=>x.srcAttr._2 + "+" + x.attr + "+" + x.dstAttr._2)
      .edges.collect().foreach(println)

    // 两者之间的关系
    graph.triplets.map(x=>x.srcAttr._1 + " is the " + x.attr + " of " + x.dstAttr._1).foreach(println)

    // 删除节点,构建子图
    val validGraphx = graph.subgraph(vpred = (id,attr)=> attr._2 != "pro")
    validGraphx.vertices.foreach(println)
    validGraphx.triplets.map(x=>x.srcAttr._1 + " is the " + x.attr + " of " + x.dstAttr._1).foreach(println)

    // 构建职业是professor的子图
    val subGraph = graph.subgraph(vpred = (id, attr)=>attr._2=="pro")
    subGraph.vertices.collect().foreach(x=>println(s"${x._2._1} is ${x._2._2}"))



  }


}

三、Spark加载neo4j

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.ml.tree.InternalNode
import org.neo4j.spark.Neo4j

object SparkGraphxConnector {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("neo4j")
      .setMaster("local[*]")
      .set("spark.neo4j.bolt.url","bolt://ip")
      .set("spark.neo4j.bolt.user","user_name")
      .set("spark.neo4j.bolt.password","password")

    val sc = new JavaSparkContext(conf)
    sc.setLogLevel("OFF")
    val neo4j = Neo4j(sc)
    val rdd = neo4j.cypher("match (n:Person) return n").loadRowRdd

    val personRDD = rdd.map(row=>{
      val map = row.get(0).asInstanceOf[InternalNode]
      new Person(map.get("home").asString(),
        map.get("name").asString(),
        map.get("personID").asString())
    })

    personRDD.foreach(println)


    // 图查询

    val graphQuery = "match (n:Person)-[r]-(a:Person) return id(p) as source,id(a) as target,type(r) as value"

    val graph:Graph[String,String] = neo4j.rels(graphQuery).loadGraph
    graph.edges.foreach(println(_))


  }

  case class Person(
                   val home:String,
                   val name:String,
                   val personID:String
                   )


}