知识图谱学习笔记-图操作
程序员文章站
2022-06-12 17:08:00
...
一、自定义图
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph
object PropertiesGraph {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("graph").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("OFF")
//定义点
val users:RDD[(VertexId,(String,String))] = sc.parallelize(
Array(
(3L,("s1","student")),
(6L,("s2","teacher")),
(5L,("s3","pro")),
(2L,("s4","pro"))
)
)
//定义边
val relationships:RDD[Edge[String]] = sc.parallelize(
Array(
Edge(3L,6L,"cooperate"),
Edge(5L,6L,"agvisor"),
Edge(2L,5L,"colleague")
)
)
val defaultUser = ("Jack ma","defaultUser")
val graph = Graph(users,relationships,defaultUser)
}
}
二、图基本操作
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph
object PropertiesGraph {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("graph").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("OFF")
//定义点
val users:RDD[(VertexId,(String,String))] = sc.parallelize(
Array(
(3L,("s1","student")),
(6L,("s2","teacher")),
(5L,("s3","pro")),
(2L,("s4","pro"))
)
)
//定义边
val relationships:RDD[Edge[String]] = sc.parallelize(
Array(
Edge(3L,6L,"cooperate"),
Edge(5L,6L,"advisor"),
Edge(2L,5L,"colleague")
)
)
val defaultUser = ("Jack ma","defaultUser")
val graph = Graph(users,relationships,defaultUser)
/********************** 图操作 ***************************/
//找到途中属性是student的点
graph.vertices.filter{case (id,(name,work))=>work=="student"}
.collect.foreach{case (id,(name,work))=>println(s"$name is $work")}
//找到途中属性为advisor的边
graph.edges.filter(x => x.attr=="advisor")
.collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))
//出度和入度操作
def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int) = {
if(a._2 > b._2) a else b
}
println("最大的出度" + graph.outDegrees.reduce(max))
println("最大的入度" + graph.inDegrees.reduce(max))
println("最大的度数" + graph.degrees.reduce(max))
// 给每一个点的职业属性上加一个字符串
graph.mapVertices{case (id,(name,work))=>(id,(name,work+"_spark"))}
.vertices.collect.foreach(println)
// triplet:a关系b a -> b
// 给图中每个元组的Edge的属性值设置为源的属性值+边的属性值+目标点的属性值
graph.mapTriplets(x=>x.srcAttr._2 + "+" + x.attr + "+" + x.dstAttr._2)
.edges.collect().foreach(println)
// 两者之间的关系
graph.triplets.map(x=>x.srcAttr._1 + " is the " + x.attr + " of " + x.dstAttr._1).foreach(println)
// 删除节点,构建子图
val validGraphx = graph.subgraph(vpred = (id,attr)=> attr._2 != "pro")
validGraphx.vertices.foreach(println)
validGraphx.triplets.map(x=>x.srcAttr._1 + " is the " + x.attr + " of " + x.dstAttr._1).foreach(println)
// 构建职业是professor的子图
val subGraph = graph.subgraph(vpred = (id, attr)=>attr._2=="pro")
subGraph.vertices.collect().foreach(x=>println(s"${x._2._1} is ${x._2._2}"))
}
}
三、Spark加载neo4j
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.ml.tree.InternalNode
import org.neo4j.spark.Neo4j
object SparkGraphxConnector {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("neo4j")
.setMaster("local[*]")
.set("spark.neo4j.bolt.url","bolt://ip")
.set("spark.neo4j.bolt.user","user_name")
.set("spark.neo4j.bolt.password","password")
val sc = new JavaSparkContext(conf)
sc.setLogLevel("OFF")
val neo4j = Neo4j(sc)
val rdd = neo4j.cypher("match (n:Person) return n").loadRowRdd
val personRDD = rdd.map(row=>{
val map = row.get(0).asInstanceOf[InternalNode]
new Person(map.get("home").asString(),
map.get("name").asString(),
map.get("personID").asString())
})
personRDD.foreach(println)
// 图查询
val graphQuery = "match (n:Person)-[r]-(a:Person) return id(p) as source,id(a) as target,type(r) as value"
val graph:Graph[String,String] = neo4j.rels(graphQuery).loadGraph
graph.edges.foreach(println(_))
}
case class Person(
val home:String,
val name:String,
val personID:String
)
}
上一篇: c# 操作注册表