欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming实时处理本地数据流

程序员文章站 2022-07-14 19:41:21
...

每隔20s监听本地文件夹“/home/hduser/Streamingtext”下新生成的文本文件,对新文件中的各单词个数进行统计

/*
Streamingtext下操作文件应注意以下几点:
1.监控目录下的文件应该具有统一的数据格式,避免在内部解析时报错。
2.文件必须是在监控目录下创建,可以通过原子性的移动或重命名操作,放入目录。
3.一旦移入目录,文件就不能再修改了,如果文件是持续写入的话,新的数据是无法读取的。
*/

package spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.log4j.{Level, Logger}

object StreamingFileWC {
  def main(args: Array[String]): Unit ={

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val sparkConf =new SparkConf().setAppName("StreamingFileWC").setMaster("local[2]")//2为核数
    //setMaster("spark://192.168.71.129:7077") //提交jar以集群运行使用此设置
    val ssc = new StreamingContext(sparkConf,Seconds(20)) //每隔20秒监听一次

    val lines = ssc.textFileStream("/home/hduser/Streamingtext")
    //val lines = ssc.textFileStream("hdfs://node01:9000/streamingdata") //手动put上传HDFS
    val words= lines.flatMap(_.split(" ")) //每行数据以空格切分
    val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)
    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

在IDE以本地模式运行程序时,不断地在/home/hduser/Streamingtext文件夹下手动添加相同数据结构文档,程序每隔20秒抓取并处理数据,控制台输出:
Spark Streaming实时处理本地数据流
Spark Streaming实时处理本地数据流

同理,如果是在集群上运行,需要将程序打成JAR包,通过spark主目录下的bin/spark-submit 提交,并不断上传文档到HDFS上指定监听路径下以模拟实时数据流。

相关标签: spark