欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

4.4-Flink DataSet分布式缓冲(Distributed Cache)

程序员文章站 2022-07-14 13:47:42
...

该笔记专栏内容与 github/flink-advanced 同步,源码与 github/flink-advanced 同步

官方文档,文章内容源码

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在 taskManager 节点中,防止task重复拉取

执行机制如下:
程序注册一个文件或者目录(本地或者远程文件系统,例如 hdfs 或者s3),通过 ExecutionEnvironment 注册缓存文件并为它起一个名称
当程序执行,Flink 自动将文件或者目录复制到所有 taskManager 节点的本地文件系统,仅会执行一次。
用户可以通过这个指定的名称查找文件或者目录,然后从 taskManager 节点的本地文件系统访问它

示例代码:

/** 分布式缓存
  * =示例说明=
  * 分布式载入用户ID黑名单文件,针对用户登录数据匹配在黑名单ID及对应登录状态
  *
  * @author Li.Wei by 2019/11/4
  */
object DistributedCache extends BatchExecutionEnvironmentApp {

  private val path = getClass.getClassLoader.getResource("data/game/blacklist-uid.txt").getPath
  bEnv.registerCachedFile(path, "blacklist-uid", executable = false)

  // 用户登录数据 DataSet
  val userLoginDataSet = DataSet.userLogin(this)

  import org.apache.flink.api.scala.extensions._ // use filterWith

  userLoginDataSet
    .map(new BlacklistMap())
    .filterWith(_._1 != "none")
    .distinct()
    .print()

}

class BlacklistMap extends RichMapFunction[UserLogin, (String, String)] {
  var source: BufferedSource = _ // 读取文件流,函数结束后执行关闭操作
  var blackUid: Seq[String] = _ // 黑名单数据,从分布式缓冲文件中载入

  override def open(config: Configuration): Unit = {
    val file: File = getRuntimeContext.getDistributedCache.getFile("blacklist-uid")
    import scala.io.Source
    source = Source.fromFile(file, "UTF-8")
    blackUid = source.getLines().toSeq
  }

  // 判断当前用户对应的ID在该用户对应角色中是否登录过
  override def map(value: UserLogin): (String, String) =
    if (blackUid.contains(value.uid)) (value.uid, value.status) else ("none", value.status)

  override def close(): Unit = source.close()
}