欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

4-36Flink Distributed Cache分布式缓存

程序员文章站 2022-07-14 13:48:36
...

Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.

The cache works as follows. A program registers a file or directory of a local or remote filesystem such as HDFS or S3 under a specific name in its ExecutionEnvironment as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker’s local filesystem.

The distributed cache is used as follows:

Register the file or directory in the ExecutionEnvironment.

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

Access the cached file in a user function (here a MapFunction). The function must extend a RichFunction class because it needs access to the RuntimeContext.

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

 实例代码:

package com.imooc.flink.course04
import java.io.File
import java.util

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

object DistributedCacheApp {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val filePath="file:///F://data/hello.txt"

    //1.注册一个本地/DHFS文件
    env.registerCachedFile(filePath,"localFile")

    val data: DataSet[String] = env.fromElements("hadoop","spark","flink","strom")

    data.map(new RichMapFunction[String,String] {
      //2.在open方法中获取到分布式缓存的内容即可
      override def open(parameters: Configuration): Unit = {
        val localFile: File = getRuntimeContext.getDistributedCache.getFile("localFile")
        val lines: util.List[String] = FileUtils.readLines(localFile)//java
        /**
         * 此时会出现一个异常,java集合和Scala集合不兼容的问题
         */
        import scala.collection.JavaConverters._
        for (ele <- lines.asScala){
          println(ele)
      }}

      override def map(value: String): String = {
        value
      }
    }).print()

  }

}