欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkStreaming常用算子

程序员文章站 2024-03-14 10:10:04
...

常用的几个算子

1.updateStateByKey 带状态的算子 必须设置checkpoint

实现:WordCount计数

  def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
    //因为 一个线程用来监控集群 一个线程用来处理数据
        conf.setMaster("local[2]")
        conf.setAppName("test")
        val ssc = new StreamingContext(conf,Durations.seconds(5))
	    chekpoint(ssc)
  }
 def  chekpoint(ssc:StreamingContext): Unit ={
            ssc.checkpoint("E:/checkpoint");
            ssc.sparkContext.setLogLevel("Error")
        var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
        val flat_rdd: DStream[String] = ds.flatMap(t => t.split(" "));
            val map_rdd = flat_rdd.map(t=>(t,1))
            var reduce_rdd = map_rdd.reduceByKey((t, t2) => t + t2)
        /*默认key的状态在内存中有一份, 在checkpoint目录中有一份。
          *多久会将内存中的数据(每个key所对应的状态) 写入到磁盘上-份呢?
          * 如果你的batchInterval小于10s那么 10s会将内存中的数据写入到磁盘一份
          * 如果bacthInterval大于10s 那么就以bacthIntval为准
          * 这样做是为了防止频繁的写入 hdfs
          * */
        var str = reduce_rdd.updateStateByKey((xianzai: Seq[Int], zhiqian: Option[Int]) => {
          var sum = 0;
          //是否为空 
          if (!zhiqian.isEmpty) {
            sum += zhiqian.get;
          }
          for (value <- xianzai) { /* 遍历集合中的元素 是int 然后下面做业务处理*/
            sum += value
            println("----" + xianzai)/* ----CompactBuffer(2) */
          }
          //要求返回Option
          Option(sum)
        })
        str.print(100)
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
      }

总结
1.updateStateByKey 会按照key进行分组 参数只针对value
参数1: 第一个Seq[Int]集合是当前一批次的状态 就是分完组当前key对应的数据value
参数2:是之前一批次按照key分组的数据value

2.Window窗口操作

  • 想要优化就必须使用 checkpoint

1.业务场景 比如集群运转一天 想看最近5秒的数据信息 这样就可以使用窗口函数
2.专业术语:第二个参数(滑动间隔),第三个参数(窗口长度)
参数必须设置为 bacthIntval的倍数 不然报错

1.存活时间:

bacthIntval 设置为5秒
第一个参数:设置的Durations.seconds(15) ,窗口长度
第二个参数:Durations.seconds(5) , 滑动间隔
经过计算是 :20秒的时候会把0-5的一批次的舍弃掉,25秒时候会把5-10那一批次的舍弃掉 那么每一批次的存活时间是 15秒,
如果:就等于用5秒拿到一批次 15秒的数据等同于3批次 如果3批次的数据 处理时间大于5秒那么就会有任务堆积 这边还没处理完,五秒过后数据又过来了
2.解决方法:就是把舍弃的batch数据减去 加上新批次的数据
这样 大大减少了处理时间
这样就设计到要保存状态了 就要使用checkpoint了

 def  chekpoint(ssc:StreamingContext): Unit ={
       ssc.sparkContext.setLogLevel("Error")
        var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
        val flat_rdd: DStream[String] = ds.flatMap(t => t.split(" "));
        var map_ds: DStream[(String, Int)] = flat_rdd.map(t => (t, 1))
        var reduce_ds = map_ds.reduceByKey((t, t2) => t + t2)
        //v1  v2 相当于普通的Reducebykey 后面的参数可以理解为 每隔5秒计算过去15秒的数据
        var window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(15), Durations.seconds(5))
 		//v1  v2 相当于普通的Reducebykey 后面的参数可以理解为 每隔5秒计算过去15秒的数据
		// var window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(15), Durations.seconds(5))
       //使用 优化  要用到两个匿名函数
        val window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow(
          //函数1:v2就是加上新进来的批次
          (v1: Int, v2: Int) => {v1 + v2},
          //函数2: v2就是减去原来的批次
          (v1: Int, v2: Int) => {v1 - v2},
          Durations.seconds(15),
          Durations.seconds(5))
        //输出 20秒的时候会把 前五秒的结果减去 加(15-20)秒的数据
          window_ds.print(100)
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
}

官方图解
SparkStreaming常用算子

window窗口操作
  • 把最近15秒的数据封装为一个DStream 可以根据自己的逻辑进行下一步处理
 val new_ds: DStream[(String, Int)] = reduce_ds.window(Durations.seconds(15), Durations.seconds(5))

3.transform算子

  • 懒执行算子
  def transForm(ssc:StreamingContext): Unit = {
    ssc.sparkContext.setLogLevel("Error")
    //广播黑名单  广播list
    var broad_list: Broadcast[List[String]] = ssc.sparkContext.broadcast(List("zhangsan", "lisi"))
    var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
    var map_ds: DStream[(String, String)] = ds.map(line => {
      //返回一个Tuple2
      Tuple2(line.split(" ")(0),line.split(" ")(1))
    })
    //transform算子可以拿到DStream中的RDD,对RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封装到一个DStream
    var trans_ds: DStream[String] = map_ds.transform(map_rdd => {
      println("driver ****端****")
      //过滤
      var filter_rdd: RDD[(String, String)] = map_rdd.filter(line => {
        //拿到广播变量中的list
        var list: List[String] = broad_list.value
        //看集合中是否 包含Tuple2._2的数据 不包含则返回true
        var bool: Boolean = !list.contains(line._2)
        println(bool)
        bool
      })
      filter_rdd.map(line => line._1)
    })
    trans_ds.print(100)
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

3.saveAsTextFile 与textFileStream 算子

def saveAsTextFile(ssc:StreamingContext): Unit ={
   //不仅可以监控端口还可以监控目录 读取文件
   var file_ds: DStream[String] = ssc.textFileStream("../MySpark/data/streamingCopyFile")
   var falt_ds: DStream[String] = file_ds.flatMap(line => line.split(" "))
   var map_ds: DStream[(String, Int)] = falt_ds.map(lines => Tuple2(lines, 1))
   val red_ds: DStream[(String, Int)] = map_ds.reduceByKey((v1,v2)=>{v1+v2})
    //多级目录可以写在前缀中  会在这个目录中产生多个文件  以 xhr开头以iii结尾
   red_ds.saveAsTextFiles("D:/tee/xhr","iii")
   ssc.start()
   //必须得设置 不然程序无法运行


   ssc.awaitTermination()
   ssc.stop()
 }

将SparkStreaming处理的结果保存在指定的目录中

saveAsTextFiles(prefix, [suffix]):
将此DStream的内容另存为文本文件。每批次数据产生的文件名称格式基于:prefix和suffix: "prefix-TIME_IN_MS[.suffix]".
注意 ????

1、saveAsTextFile是调用saveAsHadoopFile实现的

2、spark中普通rdd可以直接只用saveAsTextFile(path)的方式,保存到本地,但是此时DStream的只有saveAsTextFiles()方法,没有传入路径的方法,

3、其参数只有prefix, suffix 其实:DStream中的saveAsTextFiles方法中又调用了rdd中的saveAsTextFile方法,我们需要将path包含在prefix中
????SparkStreaming监控一个目录数据时:
1.这个目录已经存在的数据不会被监控到,可以监控增加的文件
2.增加的文件必须是原子性的在目录中产生(在已有的文件中追加数据,不会被监控到)

算子集

outputoperator算子 transformation算子
print map
saveAsTextFiles() flatmap
saveAsObjectFiles() filter
saveAsHadoopFiles() repartition
foreachRdd() count
自己测试 reduce
countByValue
join

**可以看出很多SparkCore中的action算子 到Streaming中就变成了Transformation算子**