欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink初次练习(Source、Sink)

程序员文章站 2022-07-14 14:16:03
...

Flink的官网:https://flink.apache.org/
使用的软件:IntelliJ IDEA Community Edition
CoreAPI:

  • DataSet:专门处理离线数据,给离线数据处理设计了更多有针对性的API. env:ExecutionEnvironment
  • DataStream:一般用于处理流式数据,也可以处理离线数据env:StreamExecutionEnvironment

一、创建SourceTest文件

练习一

读取目录下的文件并打印输出

package cn.tedu.dataset;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1.获取执行环节
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.获取数据源
        DataSource<String> source = env.readTextFile("data.txt");
        //3.转化数据
        //4.输出结果
        source.print();
        //5.触发执行程序
        //在datasetAPI中一般不写第五步

    }
}

需要记住的点:

  • throws Exception抛出异常是在第四步鼠标放在print右边使用Alt+回车,选择第一个选项后出现,之前print底下有红色的波浪线
  • public static void main(String[] args)直接写个main再回车就能出现
  • 前两步可以先写等号右边的,再使用Alt+Shift+L,定义名称

练习二

自己定义一个List并打印输出

package cn.tedu.dataset;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1.获取执行环节
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.获取数据源
        List<String> list = new ArrayList<>();
        list.add("hello");
        list.add("world");
        DataSource<String> source = env.fromCollection(list);
        //3.转化数据

        //4.输出结果
        source.print();
        //5.触发执行程序
        //在datasetAPI中一般不写第五步

    }
}

与第一次练习相比就只修改了第二步

其他获取数据源比较常用的代码:

读取文件
DataSource<String> source = env.readTextFile("data.txt");
读取自己输入的int型
DataSource<Integer> source = env.fromElements(1, 2, 3);

练习三

读取hdfs中的文件

前提:
1.配置文件pom.xml中需要有以下的代码,不然连不上hadoop

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.12</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.1</version>
        </dependency>

2.开启虚拟机,检查jps是否全部启动

上面两步没问题后,开始输入代码

package cn.tedu.dataset;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1.获取执行环节
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.获取数据源
        DataSource<String> source = env.readTextFile("hdfs://192.168.65.161:9000/flinkdata/hadoopdata.txt");

        //3.转化数据

        //4.输出结果
        source.print();
        //5.触发执行程序
        //在datasetAPI中一般不写第五步

    }
}

依旧只改了第二步
需要记住的点:

  • import导入的包有些并没有用,只是之前某次练习中有用到,且所有的import都是自动导入的
  • 路径要正确,并确保该文件存在

二、创建SinkTest文件

练习一

package cn.tedu.dataset;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class SinkTest {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.获取数据源 Source
        DataSource<String> source = env.fromElements("马上就要过年了");
        //3,转化数据 Transformation
        //4.输出结果  Sink
        source.print();
        //5.触发执行程序
    }
}

练习二

package cn.tedu.dataset;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class SinkTest {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.获取数据源 Source
        DataSource<String> source = env.fromElements("马上就要过年了");
        //3,转化数据 Transformation
        //4.输出结果  Sink
        source.writeAsText("result.txt");
        //5.触发执行程序
        env.execute();
        //在DataSetAPI中,如果计算结果不落地,只是打印,那么不能有触发执行这个操作,如果计算结果不打印而是写成文件或者需要输出到其他系统中,那么必须触发程序执行
    }
}

修改了第四步以及增加了第五步,将输出保存成文件
需要记住的点:

  • 注意第五步的注释,如果没写第五步,则找不到创建的文件