欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink快速入门

程序员文章站 2024-03-23 15:16:10
...

Flink快速入门

一.Flink介绍

大数据的4代计算引擎:

  1. 第1代——Hadoop MapReduce
    • 批处理
    • Mapper、Reducer
  2. 第2代——DAG框架(Tez) + MapReduce
    • 批处理
    • 1个Tez = MR(1) + MR(2) + … + MR(n)
    • 相比MR效率有所提升
  3. 第3代——Spark
    • 批处理、流处理、SQL高层API支持
    • 自带DAG
    • 内存迭代计算、性能较之前大幅提升
  4. 第4代——Flink
    • 批处理、流处理、SQL高层API支持
    • 自带DAG
    • 流式计算性能更高、可靠性更高

(一)Flink简介

  • Flink是一个针对流数据批数据的分布式引擎,主要由 java 代码实现
  • 运行速度比Spark更快,计算量越大,性能优势更明显
  • 真正的流计算,就像Storm一样,同时也支持批处理
  • 比Spark更轻量级的容错
  • 支持 scalaJava API

(二)对比Flink、Spark、Storm

计算框架 处理模型 保证次数 容错机制 延时 吞吐量
Storm native(数据进入立即处理) At-least-once至少一次 ACK机制
Storm Trident micro-batching(划分小批处理) Exactly-once仅仅一次 ACK机制
Spark Streaming micro-batching Exactly-once 基于RDD和 checkpoint
Flik native、micro-batching Exactly-once checkpoint(Flink快照)

(三)Flink生态圈

Flink快速入门

  • 支持Java和Scala API
  • 支持Table(SQL)
  • 支持图操作(Flink Gelly)
  • 支持机器学习(Flink ML)
  • 支持将Flink程序部署到YARN

(四)编程模型

Flink提供不同级别的抽象来开发 流 / 批 处理应用程序
Flink快速入门

二.Flink环境搭建

Flink支持多种安装模式:

  1. local(本地)——单机模式,一般不使用
  2. standalone——独立模式,Flink自带集群,开发测试环境使用
  3. yarn——计算资源统一由Hadoop YARN管理,生产环境测试

(一)Standalone集群模式

1.上传flink压缩包到node01主机上的/export/software目录,我们使用的是2018年8月更新的flink-1.6.0版本。

2.解压缩flink到 /export/servers 目录

tar -xvzf flink-1.6.0-bin-hadoop26-scala_2.11.tgz -C /export/servers/

3.修改 conf/flink-conf.yaml配置文件:

cd /export/servers/flink-1.6.0/conf
vim flink-conf.yaml

#配置Master的机器名(IP地址) 
jobmanager.rpc.address:node01

#配置每个taskmanager生成的临时文件夹 
taskmanager.tmp.dirs:/export/servers/flink-1.6.0/tmp

4.修改conf/slaves配置文件

cd /export/servers/flink-1.6.0/conf
vim slaves

#指定flink集群从节点的主机名
node01
node02
node03

5.修改系统环境变量配置文件

cd /etc
vim profile

#添加Hadoop配置文件的目录
export HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

#node02和node03都要修改,要复制到其他机器
scp -r /etc/profile node02:/etc
scp -r /etc/profile node03:/etc

#三台机重新加载环境变量
source /etc/profile

6.启动Flink集群

cd /export/servers/flink-1.6.0/
bin/start-cluster.sh

7.浏览Flink Web UI界面:http://node01:8081

8.运行测试任务

#创建一个wordcount.txt文件
cd ~
vim wordcount.txt

#向文件中添加下面的内容
Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS

#上传这个文件到HDFS上
hdfs dfs -mkdir -p /flink/input
hdfs dfs -put /root/wordcount.txt /flink/input

#运行测试任务
cd /export/servers/flink-1.6.0/
bin/flink run /export/servers/flink-1.6.0/examples/batch/WordCount.jar --input hdfs://node01:8020/flink/input/wordcount.txt --output hdfs://node01:8020/flink/output/result.txt

Standalone集群架构 :
Flink快速入门

  • client客户端提交任务给JobManager
  • JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行
  • TaskManager定期向JobManager汇报状态

(二)高可用HA模式

    从上述架构图中,可发现JobManager存在 单点故障 ,一旦JobManager出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink的HA。

HA架构图:Flink快速入门

1.在flink-conf.yaml中添加zookeeper配置

cd /export/servers/flink-1.6.0/conf
vim flink-conf.yaml

#开启HA,使用文件系统作为快照存储
state.backend:filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node01:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability:zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node01:8020/flink/ha/
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

2.将配置过的HA的 flink-conf.yaml 分发到另外两个节点

cd /export/servers/flink-1.6.0/conf
scp -r flink-conf.yaml node02:$PWD
scp -r flink-conf.yaml node03:$PWD

3.到node02中修改flink-conf.yaml中的配置,将JobManager设置为自己节点的名称

cd /export/servers/flink-1.6.0/conf
vim flink-conf.yaml

#进行如下配置
jobmanager.rpc.address: node02 

4.在node01的masters 配置文件中添加多个节点

cd /export/servers/flink-1.6.0/conf
vim masters

node01:8081
node02:8081

5.分发masters配置文件到另外两个节点

cd /export/servers/flink-1.6.0/conf
scp -r masters node02:$PWD
scp -r masters node03:$PWD

6.启动flink集群

#注意一定要提前把zookeeper集群和HDFS集群启动
cd /export/servers/flink-1.6.0/
bin/start-cluster.sh

#分别查看两个节点的Flink Web UI 10. kill掉一个节点,查看另外的一个节点的Web UI

(三)YARN模式

    在企业中,经常需要将Flink集群部署到YARN,因为可以使用YARN来管理所有计算资源。而且Spark程序也可以部署到YARN上。Flink运行在YARN上,可以使用yarn-session来快速提交作业到YARN集群。

yarn模式任务递交流程:
Flink快速入门
yarn-session提供两种模式:

  1. 会话模式
    • 使用Flink中的yarn-session(yarn客户端),会启动两个必要服务JobManager 和 TaskManagers
      客户端通过yarn-session提交作业
    • yarn-session会一直启动,不停地接收客户端提交的作用
    • 有大量的小作业,适合使用这种方式
      Flink快速入门
  2. 分离模式
    • 直接提交任务给YARN
    • 大作业,适合使用这种方式
      Flink快速入门

注意:

使用YARN模式一定要修改三台主机Hadoop的yarn-site.xml,添加该配置表示内存超过分配值,是否将任务杀掉。默认为true。运行Flink程序,很容易超过分配的内存。

<property>
	<name>yarn.nodemanager.vmem-check-enabled</name>
	<value>false</value>
</property>

然后要重启HDFS、YARN集群,不需要启动Flink集群。

1.YARN Session模式

①在flink目录启动yarn-session

cd /export/servers/flink-1.6.0
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

# -n 表示申请2个容器
# -s 表示每个容器启动多少个slot
# -tm 表示每个TaskManager申请800M内存
# -d 表示以后台程序方式运行
# 启动后去http://node01:8088发现已经启动了一个Apache Flink Application

2.使用flink提交任务

#提交安装包提供的样例任务
bin/flink run examples/batch/WordCount.jar

3.如果程序运行完了,可以使用 yarn application -kill application_id 杀掉任务

yarn application -kill application_1566999670941_0001
2.分离模式

使用flink直接提交任务

bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
# -yn 表示TaskManager的个数

三.批处理常用Scala API

(一)入门案例WordCount

import org.apache.flink.api.scala.ExecutionEnvironment

object MyFlinkWordCount {
  def main(args: Array[String]): Unit = {
    //1.获取执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //2.由集合构建数据源,注意导入隐式转换
    import org.apache.flink.api.scala._
    val lineDataSet: DataSet[String] = env.fromCollection(List("hadoop spark hadoop","hadoop spark hive"))

    //3.数据处理
    val wordDateSet: DataSet[String] = lineDataSet.flatMap(_.split(" "))
    val wordAndOneDS: DataSet[(String, Int)] = wordDateSet.map((_,1))
    //根据单词分组,groupBy(0)表示根据元祖下标为1的元素进行分组
    val groupedDS: GroupedDataSet[(String, Int)] = wordAndOneDS.groupBy(0)
    //根据单词聚合,sum(1)表示将分组内所有的元祖的下表为1的元素相加求和
    val wordAndCountADS: AggregateDataSet[(String, Int)] = groupedDS.sum(1)

    wordAndCountADS.print()
  }
}

(二)Flink Source数据源

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object MyReadFileSource {
  def main(args: Array[String]): Unit = {
    //执行初始化运行环境,导入隐式转换
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    //1.基于本地集合的数据源
    val listDS: DataSet[String] = env.fromCollection(List("hello hadoop","hello flink"))
    //listDS.print()

    //2.基于本地文件的source
    val textDS: DataSet[String] = env.readTextFile("D:\\1_my_study_work\\Flink\\day01\\wordcount.txt")
    //textDS.print()

    //3.基于csv文件的source,注意读csv文件要创建一个样例类对每行数据进行封装
    val csvDS: DataSet[Score] = env.readCsvFile[Score]("D:\\1_my_study_work\\Flink\\day01\\score.csv")
    //csvDS.print()

    //4.基于压缩文件的source
    val gzDS: DataSet[String] = env.readTextFile("D:\\1_my_study_work\\Flink\\day01\\distribute_cache_student")
    gzDS.print()

  }
  case class Score(subId:Int,stuName:String,stuId:String,score:Double)
}

(三)Fink sink数据输出

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object MySourceSink {
  def main(args: Array[String]): Unit = {
    //获取执行环境,导入隐式转换
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    //从一个集合中获取数据
    val textDS: DataSet[String] = env.fromCollection(List("hadoop spark hadoop", "hadoop spark hive"))

    //1.基于本地集合的sink
    val localSeq: Seq[String] = textDS.collect()
    //localSeq.foreach(println)

    //2.基于本地文件的sink
    textDS.writeAsText("hdfs://node01:8020/flink/out_result.txt")
    //将数据保存到文件中的时候,一定要加execute,打印输出的时候可以不加print
    env.execute()
  }
}

(四)transformation算子操作

Transformation 说明
map 将DataSet中的每一个元素转换为另外一个元素
flatMap 将DataSet中的每一个元素转换为0…n个元素
mapPartition 将一个分区中的元素转换为另一个元素
filter 过滤出来一些符合条件的元素
reduce 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
reduceGroup 将一个dataset或者一个group聚合成一个或多个元素
aggregate 按照内置的方式来进行聚合。例如:SUM/MIN/MAX…
distinct 去重
join 将两个DataSet按照一定条件连接到一起,形成新的DataSet
union 将两个DataSet取并集,并自动进行去重
rebalance 让每个分区的数据均匀分布,避免数据倾斜
partitionByHash 按照指定的key进行hash分区
sortPartition 指定字段对分区中的数据进行排序
map
import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * map算子将DataSet中的每一个元素转换为另外一个元素
  * 使用map操作,将以下数据
  * "1,张三", "2,李四", "3,王五", "4,赵六"
  * 转换为一个scala的样例类
  */
object MyMapDemo {
  def main(args: Array[String]): Unit = {
    //获取执行环境,导入隐式转换
    val env = ExecutionEnvironment.getExecutionEnvironment
    import   org.apache.flink.api.scala._

    //获取数据源
    val lineDS: DataSet[String] = env.fromCollection(List("1,张三", "2,李四", "3,王五", "4,赵六"))

    //将数据转换成样例类
    val userDS: DataSet[User] = lineDS.map(line => {
      val infoArr: Array[String] = line.split(",")
      User(infoArr(0), infoArr(1))
    })

    userDS.print()
  }
  
  case class User(uid:String,name:String)
}
flatMap
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable
/**flatMap算子将DataSet中的每一个元素转换为0...n个元素
  * 需求:分别将以下数据,转换成 国家 、 省份 、 城市 三个维度的数据。
  * 将以下数据
  * 张三,中国,江西省,南昌市
  * 李四,中国,河北省,石家庄市
  * Tom,America,NewYork,Manhattan
  * 转换为
  * 张三,中国
  * 张三,中国江西省
  * 张三,中国江西省南昌市
  */
object MyFlatMapDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val lineDS: DataSet[String] = env.fromCollection(List(
      "张三,中国,江西省,南昌市",
      "李四,中国,河北省,石家庄市",
      "Tom,America,NewYork,Manhattan"
    ))
    val userDS: DataSet[String] = lineDS.flatMap(line => {
      val wordArr: mutable.ArrayOps[String] = line.split(",")
      List(
        (wordArr(0) + "," + wordArr(1)),
        (wordArr(0) + "," + wordArr(1)) + wordArr(2),
        (wordArr(0) + "," + wordArr(1)) + wordArr(2) + wordArr(3)
      )
    })

    userDS.print()
  }
}
mapPartition

    当需要访问一些外部存储(如: mySQL),使用mapPartition获取一次连接就可对整个分区的数据进行JDBC操作,而使用map每操作一条数据都会获取一次连接。

import org.apache.flink.api.scala.ExecutionEnvironment

/**mapPartition算子:将一个分区中的元素转换为另一个元素
  * 使用mapPartition操作,将以下数据
  * "1,张三", "2,李四", "3,王五", "4,赵六"
  * 转换为一个scala的样例类。
  */
object MyMapPatition {
  case class User(id:String,name:String)
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val lineDS: DataSet[String] = env.fromCollection(List("1,张三", "2,李四", "3,王五", "4,赵六"))

    val userDS: DataSet[User] = lineDS.mapPartition(iter => {
      iter.map(ele => {
        val fields: Array[String] = ele.split(",")
        User(fields(0), fields(1))
      })
    })
    
    userDS.print()
  }
}
filter
import org.apache.flink.api.scala.ExecutionEnvironment
/**filter算子: 过滤出来 一些符合条件的元素
  * 过滤出来以下以 h 开头的单词。
  * "hadoop", "hive", "spark", "flink","hadoop"
  */
object MyFilterDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val lineDS: DataSet[String] = env.fromCollection(List("hadoop", "hive", "spark", "flink","hadoop"))

    lineDS.filter(_.startsWith("h")).print()

  }
}
reduce
import org.apache.flink.api.scala.ExecutionEnvironment

/**reduce算子: 可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素
  * 请将以下元组数据,使用 reduce 操作聚合成一个最终结果
  * ("java" , 1) , ("java", 1) ,("java" , 1)
  * 将上传元素数据转换为 ("java",3)
  */
object MyReduceDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val tuppleDS: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("java" , 1)))

    val rsDS: DataSet[(String, Int)] = tuppleDS.reduce((tup1, tup2) => {
      (tup1._1, tup1._2 + tup2._2)
    })

    rsDS.print()
  }
}
reduceGroup
  • reduce是将数据一个个拉取到另外一个节点,然后再执行计算
  • reduceGroup是先在每个group所在的节点上执行计算,然后再拉取
    Flink快速入门
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

/**reduceGroup算子: 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
  * 请将以下元组数据,下按照单词使用 groupBy 进行分组,再使用 reduceGroup 操作进行单词计数
  * ("java" , 1) , ("java", 1) ,("scala" , 1)
  */
object MyReduceGroupDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val tuppleDS: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))

    val rgDS = tuppleDS.reduceGroup(iter => {
      iter.reduce((t1, t2) => {
        (t1._1, t1._2 + t2._2)
      })
    })

    rgDS.print() //结果为(java,3),说明这三个元祖都在同一个分区
  }
}
aggregate

按照内置的方式来进行聚合。例如:SUM/MIN/MAX…

import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
/** aggregate算子: 按照内置的方式来进行聚合。例如:SUM/MIN/MAX..
  * 请将以下元组数据,使用 aggregate 操作进行单词统计
  * ("java" , 1) , ("java", 1) ,("scala" , 1)
  */
object MyAggregateDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val tuppleDS: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 1) ,("scala" , 1)))
    val groupedDS: GroupedDataSet[(String, Int)] = tuppleDS.groupBy(0)

    val rsDS: AggregateDataSet[(String, Int)] = groupedDS.aggregate(Aggregations.SUM,1)
    rsDS.print()
  }
}

注意:
    要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support 
grouping with KeySelector functions, yet. 
distinct
import org.apache.flink.api.scala.ExecutionEnvironment

/**distinct算子: 去除重复的数据
  * 请将以下元组数据,使用 distinct 操作去除重复的单词
  * ("java" , 1) , ("java", 2) ,("scala" , 1)
  */
object MyDistinctDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import   org.apache.flink.api.scala._

    val tuppleDS: DataSet[(String, Int)] = env.fromCollection(List(("java" , 1) , ("java", 2) ,("scala" , 1)))
    val distinctedDS: DataSet[(String, Int)] = tuppleDS.distinct(0)

    distinctedDS.print()
  }
}
join

Flink快速入门

import org.apache.flink.api.scala.ExecutionEnvironment

//join算子: 将两个DataSet连接起来
object MyJoinDemo {
  case class Subject(subId:Int,name:String)
  case class Score(stuId:Int,stuName:String,subId:Int,score:Double)
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val subjectDS: DataSet[Subject] = env.readCsvFile[Subject]("D:\\1_my_study_work\\Flink\\day01\\subject.csv")
    val scoreDS: DataSet[Score] = env.readCsvFile[Score]("D:\\1_my_study_work\\Flink\\day01\\score.csv")

    val joinedDS: JoinDataSet[Score, Subject] = scoreDS.join(subjectDS).where(score => score.subId).equalTo(subject => {subject.subId})
    joinedDS.print()
  }
}
union
import org.apache.flink.api.scala.ExecutionEnvironment

//将两个DataSet取并集,并不会进行去重。
object MyUnionDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val wordDataSet1 = env.fromCollection(List("hadoop", "hive", "flume"))
    val wordDataSet2 = env.fromCollection(List("hadoop", "hive", "spark"))

    val rsDS: DataSet[String] = wordDataSet1.union(wordDataSet2)
    rsDS.print()
  }
}
rebalance
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment

//flink解决数据倾斜问题
object MyRebalanceDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    //使用`env.generateSequence`创建0-100的并行数据
    val numDS: DataSet[Long] = env.generateSequence(0,100)
    //使用`fiter`过滤出来`大于8`的数字,为了让分区的数数据量发生变化
    //然后就可对比加.rebalance()和不加的区别
    val filterDataSet: DataSet[Long] = numDS.filter(_ > 8)//.rebalance()

    //spark查询数据在哪个分区用mapPartitionWithIndex算子
    //我门也需要查看每个分区的数据情况,才能判断数据有没有倾斜
    //使用的电脑的CPC有几个线程就会分几个区
    val rsDS: DataSet[(Int, Long)] = numDS.map(new RichMapFunction[Long, (Int, Long)] {
      override def map(elem: Long): (Int, Long) = {
        (getRuntimeContext.getIndexOfThisSubtask, elem)
      }
    })

    rsDS.print()
  }
}
partitionByHash
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._

/**hashPartition算子: 按照指定的key进行hash分区
  * 基于以下列表数据来创建数据源,并按照hashPartition进行分区,然后输出到文件。
  * List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),  (3, "a"), (3, "b"), (3, "c"), (4, "a"), (4, "a"), (5, "a"), (5, "a"))
  */
object MyPartitionByHashDemo {
  def main(args: Array[String]): Unit = {
    //1. 构建批处理运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    /**
      * 设置并行度三种设置方式
      * 1:读取配置文件中的默认并行度设置
      * 2:设置全局的并行度
      * 3:对算子设置并行度
      */
    //2. 设置并行度为5
    env.setParallelism(5)
    //3. 构建测试数据集
    val textDataSet: DataSet[(Int, String)] = env.fromCollection(
      List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),  (3, "a"), (3, "b"), (3, "c"), (4, "a"), (4, "a"), (5, "a"), (5, "a"))
    )//.setParallelism(4)

    val hashPartedDataSet: DataSet[(Int, String)] = textDataSet.partitionByHash(x => x._1)
    //源码对RichMapFunction接口的说明: RichMapFunction接口是map算子的丰富变体
    //它提供对runtimecontext的访问,并提供open初始化和close销毁方法
    val resultDS: DataSet[(Int, (Int, String))] = hashPartedDataSet.map(new RichMapFunction[(Int, String), (Int, (Int, String))] {
      //map方法会对DataSet中的每个元素执行map操作,也就是有多少元素,map方法就被执行多少次
      //通过调用getRuntimeContext.getIndexOfThisSubtask,我们可获得每个元素所在的分区并返回
      override def map(value: (Int, String)):(Int,(Int,String)) = {
        (getRuntimeContext.getIndexOfThisSubtask, value)
      }
    })
    //有多少个并行度就有多少个结果文件,可能结果文件中数据是空的
    resultDS.writeAsText("./data/output3")
    env.execute()
  }
}
sortPartition
package cn.itcast.batch

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment

//sortPartition算子:指定字段对分区中的数据进行排序
object MySortPartitionDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val wordDS: DataSet[String] = env.fromCollection(List("hadoop", "hadoop", "hadoop", "hive", "hive", "spark", "spark", "flink"))

    //设置数据集任务的并行度为2, Flink的配置文件默认是1
    wordDS.setParallelism(2)

    //使用sortPartition按照字符串进行降序排序
    val sortedDS: DataSet[String] = wordDS.sortPartition(_.toString,Order.DESCENDING)

    //输出到hdfs上会发现生成了两个文件,每个文件内的结果是排好序的
    sortedDS.writeAsText("hdfs://node01:8020/flink/output4")
    env.execute("APP")
  }
}

(五)广播变量★

    和Spark一样,Flink也支持广播。可以将数据广播到TaskManager上,数据存储到内存中。可以理解广播就是一个公共的共享变量将一个数据集广播后,每个节点 只存一份,不同的Task都可以在节点上获取到如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费。
Flink快速入门
注意:

  1. 广播出去的变量存放在每个节点的内存中,直到程序结束,这个数据集不能太大
  2. withBroadcastSet 需要在要使用到广播的操作后调用
  3. 需要手动导入 scala.collection.JavaConverters._ 将Java集合转换为scala集合
import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**广播变量: 将数据广播到TaskManager节点,同一个taskmanager节点的solt中的task任务可以共享一份数据
  * 需求:
  * 创建一个 学生 数据集,包含以下数据
  * |学生ID | 姓名 |
  * |------|------|
  * List((1, "张三"), (2, "李四"), (3, "王五"))
  *
  * 再创建一个 成绩 数据集,
  * |学生ID | 学科 | 成绩 |
  * |------|------|-----|
  * List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))
  *
  * 请通过广播获取到学生姓名,将数据转换为
  * List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))
  */
object MyBatchBroadcastDemo {
  def main(args: Array[String]): Unit = {
    //1. 获取批处理运行环境,导入隐式转换
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //2. 分别创建两个数据集
    val stuDS: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))
    val scoreDS: DataSet[(Int, String, Int)] = env.fromCollection(List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86)))

    //3.在数据集调用 map 方法后,调用 withBroadcastSet 将 学生 数据集创建广播
    val resultDS: DataSet[(String, String, Int)] = scoreDS.map(
      //4. 实现 RichMapFunction<IN, OUT>,需指定输入和返回数据的泛型
      // 输入的数据(学生id,学科名字,学生成绩);返回的数据(学生名字,学科成名,成绩)
      new RichMapFunction[(Int, String, Int), (String, String, Int)] {
        //定义从广播变量中获取的学生数据集合,在open方法中初始化
        var stuMap: Map[Int, String] = null

        //重写open方法, 将获取到的广播变量赋值给stuMap
        //open初始化方法会在创建匿名内部类对象的时候被执行一次,在对象的生命周期中只被执行这一次
        override def open(parameters: Configuration): Unit = {
          //因为获取到的广播变量中的数据类型是java的集合类型,但是我们的代码是scala因此需要将java的集合转换成scala的集合
          //我们这里将list转换成了map对象,之所以能够转换成Map是因为list中的元素是对偶元祖,因此可以转换成kv键值对类型
          //之所以要转换,是因为后面好用,传递一个学生id,可以直接获取到学生的名字
          import scala.collection.JavaConversions._
          val stuList: util.List[(Int, String)] = getRuntimeContext.getBroadcastVariable[(Int, String)]("student")
          stuMap = stuList.toMap
        }

        //map方法会对DataSet中的每个元素执行map操作,也就是有多少元素,map方法就被执行多少次
        override def map(score: (Int, String, Int)): (String, String, Int) = {
          //map方法要做的就是:
          //将输入的数据(学生id,学科名字,学生成绩);返回的数据(学生名字,学科成名,成绩)
          val stuId: Int = score._1
          val stuName: String = stuMap.getOrElse(stuId, "")
          (stuName, score._2, score._3)
        }
      }
    ).withBroadcastSet(stuDS, "student")

    //6. 打印测试
    resultDS.print()
  }
}

(六)分布式缓存★

    Flink提供了一个类似于Hadoop的分布式缓存。前面讲的广播,是将一个DataSet广播到每一个TaskManager的内存中。而Distributed Cache是从外部加载一个文件/目录(例如:HDFS),然后分别复制到每一个TaskManager的本地磁盘中。

用法:使用Flink运行时环境的 registerCachedFile 注册一个分布式缓存,在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存。

第一步: 将需要分布缓存的文件student.txt上传到HDFS /flink 目录下

#创建student.txt文件将三条数据写入
vim student.txt

1,张三
2,李四
3,王五

#上传文件到hdfs的/flink目录下
hdfs dfs -put student.txt /flink
rm -rf student.txt

第二步:代码实现

import java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.io.Source

/**
  * 实现分布式缓存功能:
  * 创建一个 成绩 数据集
  * List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))
  *
  * 请通过分布式缓存获取到学生姓名,将数据转换为
  * List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))
  */
object MyDistributeCacheDemo {
    def main(args: Array[String]): Unit = {
      /**
        * 实现思路:
        * 1. 将 distribute_cache_student 文件上传到HDFS / 目录下
        * 2. 获取批处理运行环境, 指定缓存文件,将文件复制到taskmanager
        * 3. 创建成绩数据集
        * 4. 对 成绩 数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)
        * RichMapFunction 的 open 方法中,获取分布式缓存数据  在 map 方法中进行转换
        * ①重写 open 方法
        * 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
        * 使用 Scala.fromFile 读取文件,并获取行
        * 将文本转换为元组(学生ID,学生姓名),再转换为List
        * ②实现 map 方法
        * 从分布式缓存中根据学生ID过滤出来学生
        * 获取学生姓名
        * 构建最终结果元组
        * 5. 打印测试
        */

      //2.获取批处理运行环境,注册缓存文件
      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
      env.registerCachedFile("hdfs://node01:8020/flink/student.txt", "student")

      //3.创建成绩数据集
      val scoreDS: DataSet[(Int, String, Int)] = env.fromCollection(
        List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86))
      )

      //4.对成绩数据集进行map转换: 将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)
      val resultDS: DataSet[(String, String, Int)] = scoreDS.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
        var stuMap: Map[Int, String] = null;

        override def open(parameters: Configuration): Unit = {
          //注册的是一个文件,是将文件复制到taskmanager上面,因此我们需要读取文件的内容
          val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")

          //获取文件内容
          val stuIter: Iterator[String] = Source.fromFile(studentFile).getLines()
          stuMap = stuIter.map(line => {
            val fields: Array[String] = line.split(",")
            (fields(0).toInt, fields(1))
          }).toMap
        }

        override def map(score: (Int, String, Int)): (String, String, Int) = {
          val stuId: Int = score._1
          val stuName: String = stuMap.getOrElse(stuId, "")
          (stuName, score._2, score._3)
        }
      })

      //5. 打印测试
      resultDS.print()
    }
  }

总结:

  • 广播变量, 将数据广播到TaskManager节点,同一个taskmanager节点的solt中的task任务可以共享一份数据,但是受限于内存的大小,不能够广播特别大的数据。
  • 分布式缓存,将文件复制到taskmanager节点的磁盘上,所以可以是非常大的数据,但是相对于广播变量来说,性能偏低,因为要读取磁盘上的文件。