FlinkDistributedCache分布式缓存解析
程序员文章站
2022-07-05 22:55:41
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件。此功能可用于共享文件,包含静态的外部数据,例如字典或者machine-learned回归模型。...
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件。此功能可用于共享文件,包含静态的外部数据,例如字典或者machine-learned回归模型。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有worker节点的本地文件系统。用户函数可以查找文件或者目录通过这个指定的名称,然后从worker节点的本地文件系统访问它。
使用分布式缓存 如下示例:
java代码:
在ExecutionEnvironment中注册文件或者目录
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从hdfs注册一个文件 env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // 注册一个本地可执行的脚本文件 env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) // 定义程序代码 并且执行 ... DataSet input = ... DataSet result = input.map(new MyMapper()); ... env.execute();
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext public final class MyMapper extends RichMapFunction { @Override public void open(Configuration config) { // 通过RuntimeContext 和 DistributedCache访问缓存文件 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // 读取文件(或者本地目录) ... } @Override public Integer map(String value) throws Exception { // 使用缓存文件的内容做一些处理 ... } }
scala代码:
在ExecutionEnvironment中注册文件或者目录
val env = ExecutionEnvironment.getExecutionEnvironment // 从hdfs注册一个文件 env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // 注册一个本地可执行的脚本文件 env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) // 定义程序代码 并且执行 ... val input: DataSet[String] = ... val result: DataSet[Integer] = input.map(new MyMapper()) ... env.execute()
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据
// 继承RichFunction 为了获取RuntimeContext class MyMapper extends RichMapFunction[String, Int] { override def open(config: Configuration): Unit = { // 通过RuntimeContext 和 DistributedCache访问缓存文件 val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile") // 读取文件(或者本地目录) ... } override def map(value: String): Int = { // 使用缓存文件的内容做一些处理 ... } }
上一篇: 黑科技满满的CES 2018 哪些国内厂商又放了“..
下一篇: oracle数据库常用命令