Flink 入门案例二(参考官方文档)
程序员文章站
2022-06-04 19:35:21
...
Flink 入门案例
具有一定实际意义的流处理程序。
结合信用卡欺诈验证场景,实现的具体Demo。
- 定义程序数据流
package com.sanxiau;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 定义了程序的数据流
*
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 设置执行环境。 任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源 官方案例中存在数据
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
// 对事件分区 & 欺诈检测
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId) //keyBy对流进行分区,保证同一个task处理同一个的key的所有数据
.process(new FraudDetectorTwo()) //对流绑定了一个操作,这个操作将会对流上的每一个消息调用所定义好的函数。
.name("fraud-detector");
// 输出结果
alerts //sink 会将 DataStream 写出到外部系统
.addSink(new AlertSink()) // AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。
.name("send-alerts");
// 运行作业 Flink 程序是懒加载的,并且只有在完全搭建好之后,才能够发布到集群上执行。
// 调用execute 时给任务传递一个任务名参数,就可以开始运行任务。
env.execute("Fraud Detection");
}
}
- 数据具体处理逻辑
package com.sanxiau;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
/**
* 对于每笔交易,欺诈检测器都会检查该帐户的标记状态
*/
public class FraudDetectorTwo extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
/**
* 小额 交易
*/
private static final double SMALL_AMOUNT = 1.00;
/**
* 大额 交易
*/
private static final double LARGE_AMOUNT = 500.00;
/**
* 定时器时间
*/
private static final long ONE_MINUTE = 60 * 1000;
/**
* 状态类型是 ValueState,这是一种能够为被其封装的变量添加容错能力的类型
*
* ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReference 和 AtomicLong
* update 用于更新状态
* value 用于获取状态值
* lear 用于清空状态。
*
* ValueState 的作用域始终限于当前的 key,ValueState 是允许空值的
* ValueState<Boolean> 实际上有 3 种状态
* unset (null),true,和 false
*/
private transient ValueState<Boolean> flagState;
/**
* 定时器状态
*/
private transient ValueState<Long> timerState;
/**
*
* 初始化 flag 和 timerState 状态
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
/**
*
* 具体 流数据 逻辑处理
*
* @param transaction
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
// 最后一笔交易状态 不为null,且数额大于 LARGE_AMOUNT
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
// 标记状态被重置时,删除定时器
cleanUp(context);
}
// 交易数额小于 SMALL_AMOUNT 设置定时器,进行欺诈验证检测
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update(true);
// 标记状态设置为true时,设置当前时间一分钟后触发的定时器。
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
/**
*
* 当定时器被触发时,重置标记状态。
*
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
}
/**
* 标记状态被重置时,删除定时器
*
* @param ctx
* @throws Exception
*/
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
- 执行结果
上一篇: electron