Spark Streaming实时处理本地数据流
程序员文章站
2022-07-14 19:41:21
...
每隔20s监听本地文件夹“/home/hduser/Streamingtext”下新生成的文本文件,对新文件中的各单词个数进行统计
/*
Streamingtext下操作文件应注意以下几点:
1.监控目录下的文件应该具有统一的数据格式,避免在内部解析时报错。
2.文件必须是在监控目录下创建,可以通过原子性的移动或重命名操作,放入目录。
3.一旦移入目录,文件就不能再修改了,如果文件是持续写入的话,新的数据是无法读取的。
*/
package spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.log4j.{Level, Logger}
object StreamingFileWC {
def main(args: Array[String]): Unit ={
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val sparkConf =new SparkConf().setAppName("StreamingFileWC").setMaster("local[2]")//2为核数
//setMaster("spark://192.168.71.129:7077") //提交jar以集群运行使用此设置
val ssc = new StreamingContext(sparkConf,Seconds(20)) //每隔20秒监听一次
val lines = ssc.textFileStream("/home/hduser/Streamingtext")
//val lines = ssc.textFileStream("hdfs://node01:9000/streamingdata") //手动put上传HDFS
val words= lines.flatMap(_.split(" ")) //每行数据以空格切分
val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)
wordcounts.print()
ssc.start()
ssc.awaitTermination()
}
}
在IDE以本地模式运行程序时,不断地在/home/hduser/Streamingtext文件夹下手动添加相同数据结构文档,程序每隔20秒抓取并处理数据,控制台输出:
同理,如果是在集群上运行,需要将程序打成JAR包,通过spark主目录下的bin/spark-submit
提交,并不断上传文档到HDFS上指定监听路径下以模拟实时数据流。
上一篇: Swift4:字符串操作,数组操作
下一篇: Flink实时处理Socket数据