Apache Flink简介
程序员文章站
2022-07-14 13:31:31
...
Flink简介
what
针对无限和有限数据流进行有状态计算的分布式执行引擎框架。集群部署,随意扩容;内存计算,速度快。
流处理应用的基本组件
- 流
- 状态
- 时间
- 事件时间
- 处理时间
应用场景
- 事件驱动型应用
- 数据分析应用
- 数据管道应用
Flink优势
- 处理高吞吐量的事件流
- 处理随时产生的事件,始终保持低延迟(sub-second)
- 高效、易于使用的k/v结构的state
- 真正的流处理框架。一次处理一个事件,每个事件都有自己的时间窗口。
- 丰富的编程模型可以很容易地实现复杂的语义。对比微批处理,在事件流上进行推理更容易。
- 使用事件时间,可以很容易地处理乱序事件等流缺陷
Flink示例
// 司机维度的行程数量
public class RideCount {
public static void main(String[] args) throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start the data generator
// 事件数据源
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
// map each ride to a tuple of (driverId, 1)
// 将单个事件 map为 其他数据模型
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) {
// 这里用1L的原因:该例子用于统计司机的行程数量,每出现一个行程事件,则加1.
// 若统计其他,如总里程、总金额等,1L对应改为单次里程数或单次金额等。
return Tuple2.of(ride.driverId, 1L);
}
});
// partition the stream by the driverId
// 分片,将Tuple2<Long, Long> 的第一个field作为key
KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
// count the rides for each driver
// 使用第几个field进行聚合,positionToSum based-0
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
// we could, in fact, print out any or all of these streams
rideCounts.print();
// run the cleansing pipeline
env.execute("Ride Count");
}
}
// 疲劳驾驶预警
public class LongRidesSolution {
/**
* Main method.
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
// 依赖与分区相关的state,实现数据处理。如
DataStream<TaxiRide> longRides = rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction());
longRides.print();
env.execute("Long Taxi Rides");
}
private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {
private ValueState<TaxiRide> rideState;
@Override
public void open(Configuration config) {
// 只有分区才可绑定相关的valueState
// 每个分区可以有多个不同名称的valueStatue
ValueStateDescriptor<TaxiRide> stateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class);
rideState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
// ride 事件数据
// out 结果集
// 获取valueState的数据
TaxiRide previousRideEvent = rideState.value();
// valueState无数据时,进行更新
if (previousRideEvent == null) {
// 更新valueState
rideState.update(ride);
if (ride.isStart) {
// 注册计时器
context.timerService().registerEventTimeTimer(getTimerTime(ride));
}
}
// valueStatue有数据时,进行处理
else {
if (!ride.isStart) {
// it's an END event, so event saved was the START event and has a timer
// the timer hasn't fired yet, and we can safely kill the timer
// 删除计时器
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
}
// both events have now been seen, we can clear the state
// 清空valueState
rideState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
// 计时器触发
// out 结果集
// if we get here, we know that the ride started two hours ago, and the END hasn't been processed
out.collect(rideState.value());
rideState.clear();
}
private long getTimerTime(TaxiRide ride) {
return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
}
}
}
// 每小时获取最多小费的司机
public class HourlyTipsSolution {
/**
* Main method.
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// start the data generator
DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());
// compute tips per hour for each driver
// 分片-窗口-处理窗口内数据
// 分片间并行处理
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId)
// 滚动窗口,如每小时,整点,如1:00:00 - 2:00:00
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());
// 对窗口范围内包含的所有(分片)生产的窗口进行聚合,无法并行
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
// You should explore how this alternative behaves. In what ways is the same as,
// and different from, the solution above (using a windowAll)?
// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
// .keyBy(t -> t.f0)
// .maxBy(2);
hourlyMax.print();
// execute the transformation pipeline
env.execute("Hourly Tips (java)");
}
/*
* Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
*/
public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process(Long key, Context context, Iterable<TaxiFare> fares,
Collector<Tuple3<Long, Long, Float>> out) {
float sumOfTips = 0F;
for (TaxiFare f : fares) {
sumOfTips += f.tip;
}
/*System.out.println(
new Date(context.window().getStart()) + Tuple3.of(context.window().getEnd(), key, sumOfTips)
.toString());*/
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
}
}
// 行程与车费
public class RidesAndFaresExample {
/**
* Main method.
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
// Set up streaming execution environment, including Web UI and REST endpoint.
// Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
// using the State Processor API.
Configuration conf = new Configuration();
conf.setString("state.backend", "filesystem");
conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
env.enableCheckpointing(10000L);
CheckpointConfig config = env.getCheckpointConfig();
// 默认为精确一次
// config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()).filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator()).keyBy((TaxiFare fare) -> fare.rideId);
// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides.connect(fares).flatMap(new EnrichmentFunction())
.uid("enrichment");
enrichedRides.print();
env.execute("Join Rides with Fares (java RichCoFlatMap)");
}
public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
// keyed, managed state
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
@Override
public void open(Configuration config) {
rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
//System.out.println("ride " + ride);
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(Tuple2.of(ride, fare));
}
else {
rideState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
//System.out.println("fare " + fare);
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(Tuple2.of(ride, fare));
}
else {
fareState.update(fare);
}
}
}
}
可前往Flink官方文档,获取更多。