1.11 flink读取本地文件例子以及细节
程序员文章站
2024-03-08 12:44:04
...
两个细节
- 可以指定文件或目录
- 可以指定读取模式一次性或持续性检测
代码例子
PROCESS_ONCE模式
public class FileToPrint {
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建TextInputFormat
TextInputFormat textInputFormat = new TextInputFormat(null);
// 可以过滤文件
textInputFormat.setFilesFilter(new FilePathFilter() {
@Override
public boolean filterPath(Path path) {
return path.getName().startsWith("2");//过滤掉2开头的文件
}
});
//指定对应的文件或者文件目录下的文件,读取对应文件内容,读取完就结束(FileProcessingMode.PROCESS_ONCE)
String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
// String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";
streamEnv.readFile(textInputFormat, filePath,FileProcessingMode.PROCESS_ONCE, 100).print();
streamEnv.execute("packaged");
}
}
PROCESS_CONTINUOUSLY模式
public class FileToPrint {
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建TextInputFormat
TextInputFormat textInputFormat = new TextInputFormat(null);
// 可以过滤文件
textInputFormat.setFilesFilter(new FilePathFilter() {
@Override
public boolean filterPath(Path path) {
return path.getName().startsWith("2");//过滤掉2开头的文件
}
});
// interval间隔时间到了会检查文件是否做了修改,有修改则促发重新获取所有的数据
String file = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
// String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";
streamEnv.readFile(textInputFormat, file, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000).print();
streamEnv.execute("packaged");
}
}
问题1
TextInputFormat构造方法中的filePath参数作用是什么? 目测没什么用,因为看源码readFile的参数也有一个filePath
而最终还是会set到inputFormat的filePath,所以我的例子在构建TextInputFormat时传入的null是可以正常运行的
问题2
如何实现文件append数据不重新获取所有内容,而是从上次的位置开始读取呢?
flink没有这么做,应该是考虑到文件数据有可能存在update而不是append,所以自己的业务是单纯的append,则需要自己实现对应功能