4-36Flink Distributed Cache分布式缓存
Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.
The cache works as follows. A program registers a file or directory of a local or remote filesystem such as HDFS or S3 under a specific name in its ExecutionEnvironment
as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker’s local filesystem.
The distributed cache is used as follows:
Register the file or directory in the ExecutionEnvironment
.
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
Access the cached file in a user function (here a MapFunction
). The function must extend a RichFunction class because it needs access to the RuntimeContext
.
// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// read the file (or navigate the directory)
...
}
override def map(value: String): Int = {
// use content of cached file
...
}
}
实例代码:
package com.imooc.flink.course04
import java.io.File
import java.util
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object DistributedCacheApp {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val filePath="file:///F://data/hello.txt"
//1.注册一个本地/DHFS文件
env.registerCachedFile(filePath,"localFile")
val data: DataSet[String] = env.fromElements("hadoop","spark","flink","strom")
data.map(new RichMapFunction[String,String] {
//2.在open方法中获取到分布式缓存的内容即可
override def open(parameters: Configuration): Unit = {
val localFile: File = getRuntimeContext.getDistributedCache.getFile("localFile")
val lines: util.List[String] = FileUtils.readLines(localFile)//java
/**
* 此时会出现一个异常,java集合和Scala集合不兼容的问题
*/
import scala.collection.JavaConverters._
for (ele <- lines.asScala){
println(ele)
}}
override def map(value: String): String = {
value
}
}).print()
}
}
上一篇: R语言使用boosting方法对数据分类与交叉验证
下一篇: 数据结构1.5:循环队列-学习
推荐阅读
-
分布式缓存系列之guava cache
-
Distributed Cache 分布式缓存
-
用java和scala实现一个简单Flink的分布式缓存(Distributed Cache)
-
4-36Flink Distributed Cache分布式缓存
-
4.4-Flink DataSet分布式缓冲(Distributed Cache)
-
Flink分布式缓存Distributed Cache应用案例实战-Flink牛刀小试
-
分布式缓存系列之guava cache
-
Sapphire Cache 1.1.9 发布,高性能 Java 分布式缓存系统
-
Flink分布式缓存Distributed Cache应用案例实战-Flink牛刀小试
-
Flink DataSet API 之 Distributed Cache(分布式缓存)