Flink Dataset Api(七)分布式缓存
程序员文章站
2022-07-14 13:50:44
...
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
【注意】广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上;
import datasetapi.sources.SourceTest.{Clazz, INFO}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.io.Source
import scala.util.Random
object SourceTest {
//(学号 , 班级) join (学生学号---学科---分数) ==(学号 , 班级 , 学科 , 分数)
case class INFO(stu_no:Int , clazz_no:String , subject:String , score:Double)
case class Clazz(stu_no:Int , clazz_no:String)
def main(args: Array[String]): Unit = {
import org.apache.flink.api.scala.extensions._
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
val env = ExecutionEnvironment.getExecutionEnvironment
//1"开启分布式缓存
val path = "hdfs://hadoop01:9000/score"
env.registerCachedFile(path , "Distribute_cache")
//2:加载本地数据
val clazz:DataSet[Clazz] = env.fromElements(
Clazz(1,"class_1"),
Clazz(2,"class_1"),
Clazz(3,"class_2"),
Clazz(4,"class_2"),
Clazz(5,"class_3"),
Clazz(6,"class_3"),
Clazz(7,"class_4"),
Clazz(8,"class_1")
)
//3:开始进行关联操作
clazz.map(new MyJoinmap()).print()
}
}
class MyJoinmap() extends RichMapFunction[Clazz,ArrayBuffer[INFO]]{
private var myLine = new ListBuffer[String]
// 【注意】广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上
override def open(parameters: Configuration): Unit = {
val file = getRuntimeContext.getDistributedCache.getFile("Distribute_cache")
val lines: Iterator[String] = Source.fromFile(file.getAbsoluteFile).getLines()
lines.foreach( line =>{
myLine.append(line)
})
}
//在map函数下进行关联操作
override def map(value: Clazz) = {
var stoNO = 0
var subject = ""
var score = 0.0
var array = new collection.mutable.ArrayBuffer[INFO]()
//(学生学号---学科---分数)
for(str <- myLine){
val tokens = str.split(",")
stoNO = tokens(0).toInt
subject = tokens(1)
score = tokens(2).toDouble
if(tokens.length == 3){
if(stoNO == value.stu_no){
array += INFO(value.stu_no , value.clazz_no , subject , score)
}
}
}
array
}
}
推荐阅读
-
Flink入门(五)——DataSet Api编程指南
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
flink -- 分布式缓存
-
Flink DataSet API
-
Flink Dataset Api(七)分布式缓存
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作
-
用java和scala实现一个简单Flink的分布式缓存(Distributed Cache)
-
Flink DataSet API
-
Flink DataSet API - Transformations
-
Flink DataSet API 使用示范