SparkStreaming常用算子
常用的几个算子
1.updateStateByKey 带状态的算子 必须设置checkpoint
实现:WordCount计数
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//因为 一个线程用来监控集群 一个线程用来处理数据
conf.setMaster("local[2]")
conf.setAppName("test")
val ssc = new StreamingContext(conf,Durations.seconds(5))
chekpoint(ssc)
}
def chekpoint(ssc:StreamingContext): Unit ={
ssc.checkpoint("E:/checkpoint");
ssc.sparkContext.setLogLevel("Error")
var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
val flat_rdd: DStream[String] = ds.flatMap(t => t.split(" "));
val map_rdd = flat_rdd.map(t=>(t,1))
var reduce_rdd = map_rdd.reduceByKey((t, t2) => t + t2)
/*默认key的状态在内存中有一份, 在checkpoint目录中有一份。
*多久会将内存中的数据(每个key所对应的状态) 写入到磁盘上-份呢?
* 如果你的batchInterval小于10s那么 10s会将内存中的数据写入到磁盘一份
* 如果bacthInterval大于10s 那么就以bacthIntval为准
* 这样做是为了防止频繁的写入 hdfs
* */
var str = reduce_rdd.updateStateByKey((xianzai: Seq[Int], zhiqian: Option[Int]) => {
var sum = 0;
//是否为空
if (!zhiqian.isEmpty) {
sum += zhiqian.get;
}
for (value <- xianzai) { /* 遍历集合中的元素 是int 然后下面做业务处理*/
sum += value
println("----" + xianzai)/* ----CompactBuffer(2) */
}
//要求返回Option
Option(sum)
})
str.print(100)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
总结
1.updateStateByKey 会按照key进行分组 参数只针对value,
参数1: 第一个Seq[Int]集合是当前一批次的状态 就是分完组当前key对应的数据value
参数2:是之前一批次按照key分组的数据value
2.Window窗口操作
- 想要优化就必须使用 checkpoint
1.业务场景 比如集群运转一天 想看最近5秒的数据信息 这样就可以使用窗口函数
2.专业术语:第二个参数(滑动间隔),第三个参数(窗口长度)
参数必须设置为 bacthIntval的倍数 不然报错
1.存活时间:
bacthIntval 设置为5秒
第一个参数:设置的Durations.seconds(15) ,窗口长度
第二个参数:Durations.seconds(5) , 滑动间隔
经过计算是 :20秒的时候会把0-5的一批次的舍弃掉,25秒时候会把5-10那一批次的舍弃掉 那么每一批次的存活时间是 15秒,如果:就等于用5秒拿到一批次 15秒的数据等同于3批次 如果3批次的数据 处理时间大于5秒那么就会有任务堆积 这边还没处理完,五秒过后数据又过来了
2.解决方法:就是把舍弃的batch数据减去 加上新批次的数据
这样 大大减少了处理时间 这样就设计到要保存状态了
就要使用checkpoint了
def chekpoint(ssc:StreamingContext): Unit ={
ssc.sparkContext.setLogLevel("Error")
var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
val flat_rdd: DStream[String] = ds.flatMap(t => t.split(" "));
var map_ds: DStream[(String, Int)] = flat_rdd.map(t => (t, 1))
var reduce_ds = map_ds.reduceByKey((t, t2) => t + t2)
//v1 v2 相当于普通的Reducebykey 后面的参数可以理解为 每隔5秒计算过去15秒的数据
var window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(15), Durations.seconds(5))
//v1 v2 相当于普通的Reducebykey 后面的参数可以理解为 每隔5秒计算过去15秒的数据
// var window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Durations.seconds(15), Durations.seconds(5))
//使用 优化 要用到两个匿名函数
val window_ds: DStream[(String, Int)] = reduce_ds.reduceByKeyAndWindow(
//函数1:v2就是加上新进来的批次
(v1: Int, v2: Int) => {v1 + v2},
//函数2: v2就是减去原来的批次
(v1: Int, v2: Int) => {v1 - v2},
Durations.seconds(15),
Durations.seconds(5))
//输出 20秒的时候会把 前五秒的结果减去 加(15-20)秒的数据
window_ds.print(100)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
官方图解
window窗口操作
- 把最近15秒的数据封装为一个DStream 可以根据自己的逻辑进行下一步处理
val new_ds: DStream[(String, Int)] = reduce_ds.window(Durations.seconds(15), Durations.seconds(5))
3.transform算子
- 懒执行算子
def transForm(ssc:StreamingContext): Unit = {
ssc.sparkContext.setLogLevel("Error")
//广播黑名单 广播list
var broad_list: Broadcast[List[String]] = ssc.sparkContext.broadcast(List("zhangsan", "lisi"))
var ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop110", 9999)
var map_ds: DStream[(String, String)] = ds.map(line => {
//返回一个Tuple2
Tuple2(line.split(" ")(0),line.split(" ")(1))
})
//transform算子可以拿到DStream中的RDD,对RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封装到一个DStream
var trans_ds: DStream[String] = map_ds.transform(map_rdd => {
println("driver ****端****")
//过滤
var filter_rdd: RDD[(String, String)] = map_rdd.filter(line => {
//拿到广播变量中的list
var list: List[String] = broad_list.value
//看集合中是否 包含Tuple2._2的数据 不包含则返回true
var bool: Boolean = !list.contains(line._2)
println(bool)
bool
})
filter_rdd.map(line => line._1)
})
trans_ds.print(100)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
3.saveAsTextFile 与textFileStream 算子
def saveAsTextFile(ssc:StreamingContext): Unit ={
//不仅可以监控端口还可以监控目录 读取文件
var file_ds: DStream[String] = ssc.textFileStream("../MySpark/data/streamingCopyFile")
var falt_ds: DStream[String] = file_ds.flatMap(line => line.split(" "))
var map_ds: DStream[(String, Int)] = falt_ds.map(lines => Tuple2(lines, 1))
val red_ds: DStream[(String, Int)] = map_ds.reduceByKey((v1,v2)=>{v1+v2})
//多级目录可以写在前缀中 会在这个目录中产生多个文件 以 xhr开头以iii结尾
red_ds.saveAsTextFiles("D:/tee/xhr","iii")
ssc.start()
//必须得设置 不然程序无法运行
ssc.awaitTermination()
ssc.stop()
}
将SparkStreaming处理的结果保存在指定的目录中
saveAsTextFiles(prefix, [suffix]):将此DStream的内容另存为文本文件。每批次数据产生的文件名称格式基于:prefix和suffix: "prefix-TIME_IN_MS[.suffix]".
注意 ????
1、saveAsTextFile是调用saveAsHadoopFile实现的
2、spark中普通rdd可以直接只用saveAsTextFile(path)的方式,保存到本地,但是此时DStream的只有saveAsTextFiles()方法,没有传入路径的方法,
3、其参数只有prefix, suffix 其实:DStream中的saveAsTextFiles方法中又调用了rdd中的saveAsTextFile方法,我们需要将path包含在prefix中
????SparkStreaming监控一个目录数据时:
1.这个目录已经存在的数据不会被监控到,可以监控增加的文件
2.增加的文件必须是原子性的在目录中产生(在已有的文件中追加数据,不会被监控到)
算子集
outputoperator算子 | transformation算子 |
---|---|
map | |
saveAsTextFiles() | flatmap |
saveAsObjectFiles() | filter |
saveAsHadoopFiles() | repartition |
foreachRdd() | count |
自己测试 | reduce |
countByValue | |
join |
**可以看出很多SparkCore中的action算子 到Streaming中就变成了Transformation算子**