Flink初次练习(Source、Sink)
程序员文章站
2022-07-14 14:16:03
...
Flink的官网:https://flink.apache.org/
使用的软件:IntelliJ IDEA Community Edition
CoreAPI:
- DataSet:专门处理离线数据,给离线数据处理设计了更多有针对性的API. env:ExecutionEnvironment
- DataStream:一般用于处理流式数据,也可以处理离线数据env:StreamExecutionEnvironment
一、创建SourceTest文件
练习一
读取目录下的文件并打印输出
package cn.tedu.dataset;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SourceTest {
public static void main(String[] args) throws Exception {
//1.获取执行环节
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据源
DataSource<String> source = env.readTextFile("data.txt");
//3.转化数据
//4.输出结果
source.print();
//5.触发执行程序
//在datasetAPI中一般不写第五步
}
}
需要记住的点:
- throws Exception抛出异常是在第四步鼠标放在print右边使用Alt+回车,选择第一个选项后出现,之前print底下有红色的波浪线
- public static void main(String[] args)直接写个main再回车就能出现
- 前两步可以先写等号右边的,再使用Alt+Shift+L,定义名称
练习二
自己定义一个List并打印输出
package cn.tedu.dataset;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class SourceTest {
public static void main(String[] args) throws Exception {
//1.获取执行环节
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据源
List<String> list = new ArrayList<>();
list.add("hello");
list.add("world");
DataSource<String> source = env.fromCollection(list);
//3.转化数据
//4.输出结果
source.print();
//5.触发执行程序
//在datasetAPI中一般不写第五步
}
}
与第一次练习相比就只修改了第二步
其他获取数据源比较常用的代码:
读取文件
DataSource<String> source = env.readTextFile("data.txt");
读取自己输入的int型
DataSource<Integer> source = env.fromElements(1, 2, 3);
练习三
读取hdfs中的文件
前提:
1.配置文件pom.xml中需要有以下的代码,不然连不上hadoop
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
2.开启虚拟机,检查jps是否全部启动
上面两步没问题后,开始输入代码
package cn.tedu.dataset;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class SourceTest {
public static void main(String[] args) throws Exception {
//1.获取执行环节
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据源
DataSource<String> source = env.readTextFile("hdfs://192.168.65.161:9000/flinkdata/hadoopdata.txt");
//3.转化数据
//4.输出结果
source.print();
//5.触发执行程序
//在datasetAPI中一般不写第五步
}
}
依旧只改了第二步
需要记住的点:
- import导入的包有些并没有用,只是之前某次练习中有用到,且所有的import都是自动导入的
- 路径要正确,并确保该文件存在
二、创建SinkTest文件
练习一
package cn.tedu.dataset;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
public class SinkTest {
public static void main(String[] args) throws Exception {
//1.获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据源 Source
DataSource<String> source = env.fromElements("马上就要过年了");
//3,转化数据 Transformation
//4.输出结果 Sink
source.print();
//5.触发执行程序
}
}
练习二
package cn.tedu.dataset;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
public class SinkTest {
public static void main(String[] args) throws Exception {
//1.获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据源 Source
DataSource<String> source = env.fromElements("马上就要过年了");
//3,转化数据 Transformation
//4.输出结果 Sink
source.writeAsText("result.txt");
//5.触发执行程序
env.execute();
//在DataSetAPI中,如果计算结果不落地,只是打印,那么不能有触发执行这个操作,如果计算结果不打印而是写成文件或者需要输出到其他系统中,那么必须触发程序执行
}
}
修改了第四步以及增加了第五步,将输出保存成文件
需要记住的点:
- 注意第五步的注释,如果没写第五步,则找不到创建的文件
推荐阅读
-
Flink1.11.2-pg-source&sink
-
flink的常用Source和Sink
-
Flink入门(二)(使用kafka作为sink和source)
-
Flink学习笔记-常用Source和Sink简单示例
-
flink 将mysql作为Source和Sink的代码示例
-
Flink初次练习(Source、Sink)
-
Flink - RabbitMQ 自定义Source/Sink
-
Flink kafka source & sink 源码解析
-
Flink实战(八)Flink 使用 Kafka Source & Kafka Sink
-
如何自定义 Flink Connectors(Source 和 Sink)?