欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

五 Flink DataStream API

程序员文章站 2022-07-14 13:39:26
...

1.map

实现Map操作,有三种方法.map操作是UDF(一进一出)
1.第一种是和spark的map类似,直接在map算子中传入匿名函数
2.其实map算子接收的是一个实现MapFunction接口的类,我们可以传入一个匿名类
3.可以创建一个实现MapFunction接口的类.

package org.example.puapi

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}

/**
 * 利用map算子把流中的SensorReading,转换成元组.
 */
object MapExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new SensorSource)

      //方法1,往map传入匿名函数,函数是传入一个参数,出来一个参数
//      .map(obj => (obj.id,obj.temperature,obj.timestamp))


      //方法二 传入一个实现MapFunction接口的匿名类
      /**
       * [SensorReading,(String,Double,Long)] : 传入类型是SensorReading
       * 返回类型是(String,Double,Long)
       * 根据逻辑来写map函数
       */
//        .map(new MapFunction[SensorReading,(String,Double,Long)] {
//          override def map(value: SensorReading) = {
//            (value.id,value.temperature,value.timestamp)
//          }
//        })
//

      //方法3
        .map(new MyMapFunction)


    stream.print()

    env.execute()
  }

  //方法3,自定义类实现MapFunction接口
  class MyMapFunction extends MapFunction[SensorReading,(String,Double,Long)] {
    override def map(value: SensorReading): (String, Double, Long) = {
      (value.id,value.temperature,value.timestamp)

    }
  }

}

2.filter

filter算子是对数据进行过滤用的.它和map算子一样,实现的方法有3中,和map不同的是flink需要一个返回值是boolean,用来判断该条数据是否保留.而传入map算子的类必须要实现
fliter是一进0出或者1出

package org.example.puapi

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}

/**
 * 利用filter算子把id为1的数据过滤出来
 */

object FilterExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new SensorSource)
        //方法1
//      .filter(_.id.equals("sensor_1"))

      //方法2
//        .filter(new FilterFunction[SensorReading] {
//          override def filter(value: SensorReading): Boolean = value.id.equals("sensor_1")
//        })

    //方法3
        .filter(new MyFilterFunction)
        .print()

    env.execute()
  }

  class MyFilterFunction extends FilterFunction[SensorReading] {
    override def filter(value: SensorReading): Boolean =  value.id.equals("sensor_1")
  }

}

3 flatmap函数

flatmap函数是一进多出
传入的类需要实现的接口是FlatMapFunction,方法和map算子是一样的.

package test2

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object FlatMapExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env
      .addSource(new SensorSource)

    // 使用`FlatMapFunction`实现`MapExample.scala`中的功能
    stream
      .flatMap(
        new FlatMapFunction[SensorReading, String] {
          override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
            // 使用`collect`方法向下游发送抽取的传感器ID
            out.collect(value.id)
          }
        }
      )
      .print()

    // 使用`FlatMapFunction`实现`FilterExample.scala`中的功能
    stream
        .flatMap(
          new FlatMapFunction[SensorReading, SensorReading] {
            override def flatMap(value: SensorReading, out: Collector[SensorReading]): Unit = {
              if (value.id.equals("sensor_1")) {
                out.collect(value)
              }
            }
          }
        )
        .print()

    env.execute()
  }
}

4 keyby

keyby是对流进行分组.相同的key进入到一个组中,但每个组中的key值不一定相同.同一个组内的不同key值逻辑上隔离的.

keyby之后的流就会变成keyedstream.
keyby的用法:
可以通过传入位置参数

keyby(0)

如果是类
可以传入属性值

keyby(''id'')

还可以传入匿名函数:

package org.example.puapi

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.example.source.self.{SensorReading, SensorSource}

object KeyByExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new SensorSource)
      //1. 根据位置
      //.keyBy(0) 
      // 2. 根据属性
      //.keyBy("id")
      
      //3. 或者传入匿名函数
//      .keyBy(_.id)
      
      //4. 自己定义KeySelector
      .keyBy(new KeySelector[SensorReading,String] {
        override def getKey(value: SensorReading): String = value.id
      })
      
      .sum(2)
      .print()

    env.execute()
  }

}

4.1 滚动聚合

滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如: sum, minimum, maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每
一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算
子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段
上面做聚合操作。 DataStream API 提供了以下滚动聚合方法。

滚动聚合算子只能用在滚动窗口,不能用在滑动窗口。

• sum():在输入流上对指定的字段做滚动相加操作。
• min():在输入流上对指定的字段求最小值。
• max():在输入流上对指定的字段求最大值。
• minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
• maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。