欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 入门案例二(参考官方文档)

程序员文章站 2022-06-04 19:35:21
...

Flink 入门案例

具有一定实际意义的流处理程序。
结合信用卡欺诈验证场景,实现的具体Demo。

  • 定义程序数据流

package com.sanxiau;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 定义了程序的数据流
 *
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob {
	public static void main(String[] args) throws Exception {
		// 设置执行环境。 任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行。
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// 创建数据源 官方案例中存在数据
		DataStream<Transaction> transactions = env
			.addSource(new TransactionSource())
			.name("transactions");
		// 对事件分区 & 欺诈检测
		DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)  //keyBy对流进行分区,保证同一个task处理同一个的key的所有数据
			.process(new FraudDetectorTwo())  //对流绑定了一个操作,这个操作将会对流上的每一个消息调用所定义好的函数。
			.name("fraud-detector");
		// 输出结果
		alerts  //sink 会将 DataStream 写出到外部系统
			.addSink(new AlertSink()) // AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。
			.name("send-alerts");
		// 运行作业 Flink 程序是懒加载的,并且只有在完全搭建好之后,才能够发布到集群上执行。
		// 调用execute 时给任务传递一个任务名参数,就可以开始运行任务。
		env.execute("Fraud Detection");
	}
}

  • 数据具体处理逻辑
package com.sanxiau;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;


/**
 * 对于每笔交易,欺诈检测器都会检查该帐户的标记状态
 */
public class FraudDetectorTwo  extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    /**
     * 小额 交易
     */
    private static final double SMALL_AMOUNT = 1.00;
    /**
     * 大额 交易
     */
    private static final double LARGE_AMOUNT = 500.00;
    /**
     *  定时器时间
     */
    private static final long ONE_MINUTE = 60 * 1000;


    /**
     *  状态类型是 ValueState,这是一种能够为被其封装的变量添加容错能力的类型
     *
     *  ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReference 和 AtomicLong
     *      update 用于更新状态
     *      value 用于获取状态值
     *      lear 用于清空状态。
     *
     *  ValueState 的作用域始终限于当前的 key,ValueState 是允许空值的
     *      ValueState<Boolean> 实际上有 3 种状态
     *          unset (null),true,和 false
     */
    private transient ValueState<Boolean> flagState;

    /**
     *  定时器状态
     */
    private transient ValueState<Long> timerState;

	/**
     *
     * 初始化 flag 和 timerState 状态
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

	/**
     * 
     * 具体 流数据 逻辑处理
     * 
     * @param transaction 
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        // 最后一笔交易状态 不为null,且数额大于 LARGE_AMOUNT
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            //  Clean up our state
            //  标记状态被重置时,删除定时器
            cleanUp(context);
        }

        // 交易数额小于 SMALL_AMOUNT 设置定时器,进行欺诈验证检测
        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);
            // 标记状态设置为true时,设置当前时间一分钟后触发的定时器。
            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

	
	/**
     * 
     * 当定时器被触发时,重置标记状态。
     * 
     * @param timestamp 
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }


    /**
     * 标记状态被重置时,删除定时器
     *
     * @param ctx
     * @throws Exception
     */
    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}

  • 执行结果

Flink 入门案例二(参考官方文档)

相关标签: flink