欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

1.11 flink读取本地文件例子以及细节

程序员文章站 2024-03-08 12:44:04
...

两个细节

  1. 可以指定文件或目录
  2. 可以指定读取模式一次性或持续性检测

代码例子

PROCESS_ONCE模式

public class FileToPrint {
    public static void main(final String[] args) throws Exception {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建TextInputFormat
        TextInputFormat textInputFormat = new TextInputFormat(null);
        // 可以过滤文件
        textInputFormat.setFilesFilter(new FilePathFilter() {
            @Override
            public boolean filterPath(Path path) {
                return path.getName().startsWith("2");//过滤掉2开头的文件
            }
        });

        //指定对应的文件或者文件目录下的文件,读取对应文件内容,读取完就结束(FileProcessingMode.PROCESS_ONCE)
        String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
//        String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";
        streamEnv.readFile(textInputFormat, filePath,FileProcessingMode.PROCESS_ONCE, 100).print();

        streamEnv.execute("packaged");

    }
}

PROCESS_CONTINUOUSLY模式

public class FileToPrint {
    public static void main(final String[] args) throws Exception {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建TextInputFormat
        TextInputFormat textInputFormat = new TextInputFormat(null);
        // 可以过滤文件
        textInputFormat.setFilesFilter(new FilePathFilter() {
            @Override
            public boolean filterPath(Path path) {
                return path.getName().startsWith("2");//过滤掉2开头的文件
            }
        });


        // interval间隔时间到了会检查文件是否做了修改,有修改则促发重新获取所有的数据
        String file = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
//        String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";
        streamEnv.readFile(textInputFormat, file, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000).print();

        streamEnv.execute("packaged");

    }
}

问题1

TextInputFormat构造方法中的filePath参数作用是什么?

目测没什么用,因为看源码readFile的参数也有一个filePath

1.11 flink读取本地文件例子以及细节

而最终还是会set到inputFormat的filePath,所以我的例子在构建TextInputFormat时传入的null是可以正常运行的

1.11 flink读取本地文件例子以及细节

问题2

如何实现文件append数据不重新获取所有内容,而是从上次的位置开始读取呢?

flink没有这么做,应该是考虑到文件数据有可能存在update而不是append,所以自己的业务是单纯的append,则需要自己实现对应功能

 

 

 

相关标签: flink 流处理