SparkStreaming的WordCount学习
程序员文章站
2022-07-06 14:15:22
...
SparkStreaming的入门WordCount:
object WordCount {
def main(args: Array[String]): Unit = {
//local[2]是启动两个线程,一个是接收socket数据,一个打印到控制台
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(1))
/**
* 数据的输入 socket: (IP : hadoop 端口: 9999)
*/
val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)
/**
* 数据的处理
*/
val resultStream = myDStream.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
resultStream.print()
/**
*程序的结尾
*/
ssc.start()
ssc.awaitTermination()
}
}
上面的WordCount只是统计一行数据中word和count,为了可以统计所有行WordCount,采用UpdateStateByKey,此算子需要加一个checkpoint目录,这里写了两种方式来实现UpdateStateByKey
UpdateStateByKeyTest:
object UpdateStateByKeyTest {
def main(args: Array[String]): Unit = {
//一定要有权限
System.setProperty("HADOOP_USER_NAME","hadoop")
/**
* 创建程序入口
*/
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
val ssc = new StreamingContext(conf,Seconds(1))
// ssc.checkpoint("hdfs://hadoop:9000/sparkstreaming")
ssc.checkpoint("D:\\tmp\\checkPoint")
/**
* 数据的输入
*/
val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)
/**
* 数据的处理
*/
// val wordAndOneStream: DStream[(String, Int)] = myDStream.flatMap(_.split(","))
// .map((_, 1))
val wordDStream: DStream[String] = myDStream.flatMap(_.split(","))
val wordAndOneStream: DStream[(String, Int)] = wordDStream.map((_,1))
/**
*
* def updateStateByKey
updateFunc: (Seq[V], Option[S]) => Option[S] 传入的参数
参数是一个函数:
(Seq[V], Option[S]) => Option[S]
每个key来说
参数一:Seq[V] 本次输入的数据
参数二:Option[S] 上一次数据的处理结果
函数的返回值: Option[S]
you,6
you,1
word,1
you,1
you,{1,1}
word,{1}
*
*/
val wordCount = (values:Seq[Int], state:Option[Int]) =>{
val currentCount = values.sum
val lastCount = state.getOrElse(0)
Some(currentCount+lastCount)
}
val resultDStream: DStream[(String, Int)] = wordAndOneStream.updateStateByKey(wordCount)
/**
* 数据的输出
*/
resultDStream.print()
/**
* 程序的结尾
*/
ssc.start()
ssc.awaitTermination()
}
}
还有一种方式:
object UpdateStateByKeyTestX {
def main(args: Array[String]): Unit = {
val checkPointDirectory = "D:\\tmp\\checkPoint"
def functionToCreateContext():StreamingContext = {
/**
* 创建程序入口
*/
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(1))
ssc.checkpoint(checkPointDirectory)
val myDStream = ssc.socketTextStream("hadoop",9999)
val wordAndOneDStream = myDStream.flatMap(_.split(",")).map((_,1))
val wordCount = (values:Seq[Int], state:Option[Int]) =>{
val currentCount = values.sum
val lastCount = state.getOrElse(0)
Some(currentCount+lastCount)
}
val resultDStream:DStream[(String,Int)] = wordAndOneDStream.updateStateByKey(wordCount)
resultDStream.print()
ssc
}
val ssc = StreamingContext.getOrCreate(checkPointDirectory,functionToCreateContext _)
/**
* 程序的结尾
*/
ssc.start()
ssc.awaitTermination()
}
}
有些不合法的字段,我们在处理数据的时候,可以把它给过滤掉,比如:! ? * 等,我们可以使用transform算子,可以用来黑名单的过滤:
object TransformTest {
def main(args: Array[String]): Unit = {
/**
* 创建程序入口
*/
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(1))
/**
* 数据的输入
*/
val myDStream = ssc.socketTextStream("hadoop",9999)
/**
* 黑名单过滤
* ? ! * 不进行单词计数
* 模拟出来黑名单(应该是从redis或者mysql里面读取出来)
*/
val wordblackRDD: RDD[(String,Boolean)] = sc.parallelize(List("?","!","*")).map((_,true))
val blackList = sc.broadcast(wordblackRDD.collect())
/**
* 数据的处理
*/
val wordAndOneDStream: DStream[(String, Int)] = myDStream.flatMap(line => line.split(","))
.map((_, 1))
val filterDStream: DStream[(String, Int)] = wordAndOneDStream.transform(rdd => {
//RDD
val wordBlackList: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackList.value)
//(String, Int) join (String, Boolean)
/**
* String, (Int, Option[Boolean])
* String: 单词
* Int : 次数
* filter(true)
*/
val leftRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(wordBlackList)
val filterRDD: RDD[(String, (Int, Option[Boolean]))] = leftRDD.filter(tuple => {
val word = tuple._1
val one = tuple._2._1
if (tuple._2._2.isEmpty) {
true
} else {
false
}
})
filterRDD.map(tuple => (tuple._1, 1))
})
val wordCountDStream: DStream[(String, Int)] = filterDStream.reduceByKey((x:Int, y:Int) => x + y)
/**
* 数据的输出
*/
wordCountDStream.print()
/**
* 程序的运行
*/
ssc.start()
ssc.awaitTermination()
}
}
还可以每隔多少秒统计多少秒的单词情况(比如:每隔10秒统计30秒的单词请况),使用ReduceByKeyAndWindow。
ReduceByKeyAndWindowWordCount:
object ReduceByKeyAndWindowWordCount {
def main(args: Array[String]): Unit = {
/**
* 创建程序入口
*/
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(1))
/**
* 数据的输入
*/
val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)
/**
* 数据的处理
*/
val wordOneDStream: DStream[(String, Int)] = myDStream.flatMap(_.split(","))
.map((_, 1))
/**
* reduceFunc: (V, V) => V,
windowDuration: Duration, windowLength(窗口长度)
slideDuration: Duration slideInterval(滑动的时间间隔).
业务意义:
每隔10秒 统计 最近 30秒的单词情况
后面:
统计广告点击的趋势
*/
val resultDStream:DStream[(String , Int)] = wordOneDStream.reduceByKeyAndWindow((x:Int, y:Int) => x + y, Seconds(30), Seconds(10))
/**
* 数据的输出
*/
resultDStream.print()
/**
* 程序的运行
*/
ssc.start()
ssc.awaitTermination()
}
}