业务团队使用Flink简要梳理
1、Flink流式计算框架使用背景
Flink是一种流式计算框架,我们为什么会用到flink呢?
因为目前云控负责的是路侧感应设备状态、交通事件、感知对象数据的监控,由路侧设备采集到的数据最终会通过物接入&BIE边缘云等应用组件写入到kafka topic中,云控流计算引擎监听kafka主题监控数据,并将读取到的监控数据做进一步的 聚合、转换、计算等操作,然后将计算结果推送到相应的业务模块(可以再次转发kafka,也可能直接写入tsdb、redis等高吞吐、低延迟的组件集群,供各服务监听使用)。流程简图如下:
2、Flink基础概念&原理
这里可以先对flink相关的概念有一个初步的认识;聊flink前先对数据集类型和数据运算模型有初识;
数据集类型有哪些?
- 有界数据集:有限不会改变的数据集合
- *数据集:无穷的持续集成的数据集合;常见的有 应用实时产生的日志 | 金融市场的实时交易记录|Iot感应设备实时产生的数据
数据运算模型有哪些?
- native流式计算:只要数据一直在产生,计算就持续地进行 long running computation
- micro-batching批处理:在预先定义的时间内运行计算,当完成时释放计算机资源
Flink即可以处理有界数据、也可以处理*数据,即支持native流式处理,也支持micro-batching的批处理;
2.1、What is Flink
三张图摘自-云邪 成都站 《Flink 技术介绍与未来展望》,图一对flink的基本概念做了阐述,图二是flink的四个核心解决方案、基石,图三flink的几类api;
2.2、Flink的整体架构
自下而上
- 部署:flink支持本地运行、能在独立集群或者被YARN或者Mesos管理的集群上运行、也能部署在云上;
- 运行:flink是一个分布式流式处理引擎,一次一个event的形式被处理;
- API:DataStream(流)、DataSet(批)、Table(表)、SQL API
- 扩展库: Flink包括用于复杂事件处理的CEP,机器学习(FlinkML),图形处理(Gelly)和Apache Storm兼容性的专用代码库;
2.3、Flink数据流编程模型
- 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以*地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
- DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者*的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
- Table API 是以 表 为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
- Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。
2.4、Flink程序于数据流结构
以云控感应设备数据量统计为例
public class StatisticsStreamingJob {
...
public static void main(String[] args) {
// 1、创建Flink任务运行环境 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.1、配置flink的checkpoint机制(启用checkPoint,并指定状态备份机制、重启策略)
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(10, TimeUnit.SECONDS)));
// 1.2、注册序列化器、序列化pojo
env.getConfig().registerTypeWithKryoSerializer(IothubMessage.class, ProtobufSerializer.class);
// 2、构造Flink的数据源Source(这里是消费8个kafka)
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka:9092");
properties.setProperty("group.id", "data_static_group");
properties.setProperty("auto.offset.reset", "latest");
FlinkKafkaConsumer<IothubMessage> consumer = new FlinkKafkaConsumer<IothubMessage>("stream_topic_mqtt_src",
new KafkaDeserializationSchema<IothubMessage>() {
@Override
public boolean isEndOfStream(IothubMessage iotmessage) {
return false;
}
@Override
public IothubMessage deserialize(ConsumerRecord<byte[], byte[]> consumerRecord)
throws InvalidProtocolBufferException {
// protocol buffer
return IothubMessage.parseFrom(consumerRecord.value());
}
@Override
public TypeInformation<IothubMessage> getProducedType() {
return TypeExtractor.getForClass(IothubMessage.class);
}
}, properties);
// 3、流数据处理 转换/计算countStates
DataStreamSink<List<Datapoint>> countStates =
env.addSource(consumer).map(new MapFunction<IothubMessage, Datapoint>() {
@Override
public Datapoint map(IothubMessage value) throws Exception {
String rscu_sn = null;
String msg_type = null;
int len = 0;
for (int i = 0; i < value.getPropsCount(); i++) {
if (value.getProps(i).getKey().equals("mqttClientId")) {
rscu_sn = value.getProps(i).getValue();
}
if (value.getProps(i).getKey().equals("mqttTopic")) {
String temp[] = value.getProps(i).getValue().split("/");
msg_type = temp[1];
}
}
if (value.hasPayload()) {
len = value.getPayload().size();
}
HashMap<String, String> tags = new HashMap<String, String>();
tags.put("RSCU_ID", rscu_sn);
tags.put("MSG_TYPE", msg_type);
Datapoint datapoint = new Datapoint()
.withMetric("Statistics_1")
.withTags(tags)
.withField("DataLen")
.addLongValue(System.currentTimeMillis(), len);
return datapoint;
}
}).timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(1))
.process(new ProcessAllWindowFunction<Datapoint, List<Datapoint>, TimeWindow>() {
@Override
public void process(Context context, Iterable<Datapoint> iterable,
Collector<List<Datapoint>> collector) throws Exception {
List<Datapoint> arrayList = new ArrayList<Datapoint>();
iterable.forEach(arrayList::add);
if (arrayList.size() > 0) {
collector.collect(arrayList);
}
}
});
// 4、流数据处理结果Sink输出
countStates.addSink(createTSDBSink());
// 5、运行任务
env.execute("Statistics Streaming Job");
}
Flink 应用程序结构就是如上图所示:
- StreamExecutionEnvironment:构建一个流的运行环境 ;包括指定checkpoint机制配置;
- Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
- Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。
- Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。
2.5、CheckPoint 设置机制&案例
-
checkPoint:是一种状态容错机制,包含两部分状态备份和状态恢复,可以定义chepoint实现,通过在Sink中实现CheckPointedFunction接口的两个方法,一个是snapshotState,一个是initializeState,它们俩英文直译过来就已经非常的通俗易懂了。一个是对state进行快照,一个是故障时恢复初始化state;实际中我们一般不会自定义,flink提供了状态备份、恢复各自的几种机制;
-
stateBackend:状态备份flink提供了3种。。
(1)、MemoryStateBackend:状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中
(2)、FsStateBackend:对于上一个MemoryStateBackend来说进行了一些优化,它的TaskManager会定期地把state存到HDFS上。也就是checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统);
注意:状态大小受TaskManager内存限制(默认支持5M,可以配置),如果在存入HDFS之前,内存中的数据就已经超过这个值的大小,那数据也还是会丢失的。优点就是对内存操作,状态访问速度很快
(3)、RocksDBStateBackend:状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务);注意:状态访问速度相比FsStateBackend有所下降。优点:可以存储超大量的状态信息,因为这个也是分布式的;
-
restart-strategy:状态数据恢复flink提供了3种策略。。
(1)、固定间隔策略 (Fixed delay):RestartStrategies.fixedDelayRestart(5,Time.of(10, TimeUnit.SECONDS),第一个参数-最大尝试重启次数,第二个参数-重启间隔
(2)、失败率策略 (Failure rate):Job失败后会重启,但是超过失败率后,Job会最终被认定失败;RestartStrategies.failureRateRestart(5,Time.of(5, TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS));第一参数 一个时间段内的最大失败次数,第二参数 衡量失败次数的是时间段,第三参数 重启间隔;
(3)、无重启 (No restart):任务失败不会恢复重启;
//获取flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(1000); // 高级选项: // 设置模式为exactly-once (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】 // ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint // ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend env.setStateBackend(new MemoryStateBackend()); //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints")); //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
如果没有启用 checkpointing,则使用无重启 (no restart) 策略;如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略;重启策略,可以在flink-conf.yaml中配置,表示全局的配置;也可以如下在应用代码中动态指定;
Flink 保证状态化计算强一致性。”状态化“意味着应用可以维护随着时间推移已经产生的数据聚合或者,并且 Filnk 的检查点机制在一次失败的事件中一个应用状态的强一致性。
2.6、Flink的分布式运行
flink 作业提交架构流程可见下图:
- Program Code:我们编写的 Flink 应用程序代码
- Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户
- Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件
- Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
2.7、Flink的slot和parallelism
参考:flink的slot和parallelism- https://zhuanlan.zhihu.com/p/92721430
2.8、Flink的wartermark
参考:flink的wartermark-https://cloud.tencent.com/developer/article/1629585
3、那么我们为什么选择 Flink
Flink 作为一个开源的分布式流式处理框架:
①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。
②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。
③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。
推荐几篇重要参考文档:
flink的序列化框架- https://juejin.im/post/6844903985309024270
flink的slot和parallelism- https://zhuanlan.zhihu.com/p/92721430
flink的stateBackend&checkpoint机制-https://juejin.im/post/6844904147494371342
flink的wartermark-https://cloud.tencent.com/developer/article/1629585
推荐阅读