五 Flink DataStream API
1.map
实现Map操作,有三种方法.map操作是UDF(一进一出)
1.第一种是和spark的map类似,直接在map算子中传入匿名函数
2.其实map算子接收的是一个实现MapFunction接口的类,我们可以传入一个匿名类
3.可以创建一个实现MapFunction接口的类.
package org.example.puapi
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}
/**
* 利用map算子把流中的SensorReading,转换成元组.
*/
object MapExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new SensorSource)
//方法1,往map传入匿名函数,函数是传入一个参数,出来一个参数
// .map(obj => (obj.id,obj.temperature,obj.timestamp))
//方法二 传入一个实现MapFunction接口的匿名类
/**
* [SensorReading,(String,Double,Long)] : 传入类型是SensorReading
* 返回类型是(String,Double,Long)
* 根据逻辑来写map函数
*/
// .map(new MapFunction[SensorReading,(String,Double,Long)] {
// override def map(value: SensorReading) = {
// (value.id,value.temperature,value.timestamp)
// }
// })
//
//方法3
.map(new MyMapFunction)
stream.print()
env.execute()
}
//方法3,自定义类实现MapFunction接口
class MyMapFunction extends MapFunction[SensorReading,(String,Double,Long)] {
override def map(value: SensorReading): (String, Double, Long) = {
(value.id,value.temperature,value.timestamp)
}
}
}
2.filter
filter算子是对数据进行过滤用的.它和map算子一样,实现的方法有3中,和map不同的是flink需要一个返回值是boolean,用来判断该条数据是否保留.而传入map算子的类必须要实现
fliter是一进0出或者1出
package org.example.puapi
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}
/**
* 利用filter算子把id为1的数据过滤出来
*/
object FilterExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new SensorSource)
//方法1
// .filter(_.id.equals("sensor_1"))
//方法2
// .filter(new FilterFunction[SensorReading] {
// override def filter(value: SensorReading): Boolean = value.id.equals("sensor_1")
// })
//方法3
.filter(new MyFilterFunction)
.print()
env.execute()
}
class MyFilterFunction extends FilterFunction[SensorReading] {
override def filter(value: SensorReading): Boolean = value.id.equals("sensor_1")
}
}
3 flatmap函数
flatmap函数是一进多出
传入的类需要实现的接口是FlatMapFunction,方法和map算子是一样的.
package test2
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object FlatMapExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
// 使用`FlatMapFunction`实现`MapExample.scala`中的功能
stream
.flatMap(
new FlatMapFunction[SensorReading, String] {
override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
// 使用`collect`方法向下游发送抽取的传感器ID
out.collect(value.id)
}
}
)
.print()
// 使用`FlatMapFunction`实现`FilterExample.scala`中的功能
stream
.flatMap(
new FlatMapFunction[SensorReading, SensorReading] {
override def flatMap(value: SensorReading, out: Collector[SensorReading]): Unit = {
if (value.id.equals("sensor_1")) {
out.collect(value)
}
}
}
)
.print()
env.execute()
}
}
4 keyby
keyby是对流进行分组.相同的key进入到一个组中,但每个组中的key值不一定相同.同一个组内的不同key值逻辑上隔离的.
keyby之后的流就会变成keyedstream.
keyby的用法:
可以通过传入位置参数
keyby(0)
如果是类
可以传入属性值
keyby(''id'')
还可以传入匿名函数:
package org.example.puapi
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}
object KeyByExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new SensorSource)
//1. 根据位置
//.keyBy(0)
// 2. 根据属性
//.keyBy("id")
//3. 或者传入匿名函数
// .keyBy(_.id)
//4. 自己定义KeySelector
.keyBy(new KeySelector[SensorReading,String] {
override def getKey(value: SensorReading): String = value.id
})
.sum(2)
.print()
env.execute()
}
}
4.1 滚动聚合
滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如: sum, minimum, maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每
一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算
子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段
上面做聚合操作。 DataStream API 提供了以下滚动聚合方法。
滚动聚合算子只能用在滚动窗口,不能用在滑动窗口。
• sum():在输入流上对指定的字段做滚动相加操作。
• min():在输入流上对指定的字段求最小值。
• max():在输入流上对指定的字段求最大值。
• minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
• maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。
滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。
推荐阅读
-
Flink实战(六) - Table API & SQL编程
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
Flink 1.8 Basic API Concepts 基本API概念
-
[编写高质量iOS代码的52个有效方法](五)接口与API设计(下)
-
Flink入门(五)——DataSet Api编程指南
-
【从零开始搭建自己的.NET Core Api框架】(五)由浅入深详解CORS跨域机制并快速实现
-
Flink DataStream API之Operators
-
[flink]#31_扩展库:State Processor API
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink DataSet API