欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink与SparkStreaming之Counters& Accumulators累加器双向应用案例实战-Flink牛刀小试

程序员文章站 2022-07-14 13:31:37
...

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:[email protected],如有任何问题,可随时联系。

1 累加器应用场景

Accumulator即累加器,与Saprk Accumulator 的应用场景差不多,都能很好地观察task在运行期间的数据变化 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。 Counter是一个具体的累加器(Accumulator)实现

2 Flink与SparkStreaming双向应用对比

2.1 Flink累加器使用

Counter是一个具体的累加器(Accumulator)实现,如IntCounter, LongCounter 和 DoubleCounter。

用法

  • 1:创建累加器

    private IntCounter numLines = new IntCounter();

  • 2:注册累加器

    getRuntimeContext().addAccumulator("num-lines", this.numLines);

  • 3:使用累加器

    this.numLines.add(1);

  • 4:获取累加器的结果

    myJobExecutionResult.getAccumulatorResult("num-lines")

2.2 SparkStreaming累加器使用

  • 1:设置自定义累加器,实现所有数据的统计功能,注意累加器也是懒执行的

    val sessionAggrStatAccumulator = new SessionAggrStatAccumulator

  • 2:自定义累加器实现

      class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {
    
          // 保存所有聚合数据
          private val aggrStatMap = mutable.HashMap[String, Int]()
        
          override def isZero: Boolean = {
            aggrStatMap.isEmpty
          }
        
          override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
            val newAcc = new SessionAggrStatAccumulator
            aggrStatMap.synchronized{
              newAcc.aggrStatMap ++= this.aggrStatMap
            }
            newAcc
          }
        
          override def reset(): Unit = {
            aggrStatMap.clear()
          }
        
          override def add(v: String): Unit = {
            if (!aggrStatMap.contains(v))
              aggrStatMap += (v -> 0)
            aggrStatMap.update(v, aggrStatMap(v) + 1)
          }
        
          override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
            other match {
              case acc:SessionAggrStatAccumulator => {
                (this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )}
              }
            }
          }
        
          override def value: mutable.HashMap[String, Int] = {
            this.aggrStatMap
          }
    }
    复制代码
  • 3:注册自定义累加器

     sc.register(sessionAggrStatAccumulator, "sessionAggrStatAccumulator")
    复制代码
  • 4:累加器值得获取与使用

      calculateAndPersistAggrStat(spark, sessionAggrStatAccumulator.value, taskUUID)
      
      sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
    复制代码

3 Flink 累加器使用案例实战

3.1 注意:只有在任务执行结束后,才能获取到累加器的值

3.2 注意:RichMapFunction的使用发生在高级特性上。

public class BatchDemoCounter {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            //1:创建累加器
           private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:注册累加器
                getRuntimeContext().addAccumulator("num-lines",this.numLines);

            }

            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
                //sum++;
                //System.out.println("sum:"+sum);
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);

        //result.print();

        result.writeAsText("d:\\BatchDemoCounter");

        JobExecutionResult jobResult = env.execute("counter");
        //3:获取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);

    }
}
复制代码

3.3 结果展示(writeAsText触发后才能读取到counter)

也即提前执行了一波action操作,然后才能取出其值。

    num:4
复制代码

总结

微观看世界,本文虽小,五脏俱全。为了成一套体系,不得已而为之,请期待后面有关Spark和kafka的源码解读系列。辛苦成文,彼此珍惜,谢谢。

秦凯新 于深圳 201811251651