用java和scala实现一个简单Flink的分布式缓存(Distributed Cache)
程序员文章站
2022-07-14 13:49:36
...
java代码
package com.wxx.flink.java.distribute;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.util.List;
public class JavaDistributeCacheApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String path ="file:///D:\\idea\\imooc\\flink\\flink-train\\data\\input.txt";
env.registerCachedFile(path, "java-dc");
DataSource<String> input = env.fromElements("a,", "b", "c");
input.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
File file = getRuntimeContext().getDistributedCache().getFile("java-dc");
List<String> lines = FileUtils.readLines(file);
for(String line : lines){
System.out.println(line);
}
}
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
}
}
scala代码
package com.wxx.flink.scala.distribute
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object DistributeCacheApp {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val path = "file:///D:\\idea\\imooc\\flink\\flink-train\\data\\input.txt"
env.registerCachedFile(path, "scala-dc")
import org.apache.flink.api.scala._
val input = env.fromElements("a","b","c")
input.map(new RichMapFunction[String, String] {
override def open(parameters: Configuration): Unit = {
val dcFile = getRuntimeContext.getDistributedCache.getFile("scala-dc")
val lines = FileUtils.readLines(dcFile)
import scala.collection.JavaConverters._
for(ele <- lines.asScala){
println(ele)
}
}
override def map(value: String) = {
value
}
}).print()
}
}
上一篇: 集成学习和随机森林方法
下一篇: JDK1.7新特性