欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming处理文件(本地文件以及hdfs上面的文件)

程序员文章站 2022-05-01 10:09:55
...

标题介绍文件流之前先介绍一下Dstream

下面是来自官网一段的说明,Discretized Streams或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象(有关更多详细信息,请参见Spark编程指南)。DStream中的每个RDD都包含来自特定间隔的数据,如下图所示。
Spark Streaming处理文件(本地文件以及hdfs上面的文件)
在DStream上执行的任何操作都转换为对基础RDD的操作。例如,在较早的将行流转换为单词的示例中,将flatMap操作应用于linesDStream中的每个RDD 以生成DStream的 wordsRDD。如下图所示。
Spark Streaming处理文件(本地文件以及hdfs上面的文件)

要从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件中读取数据,可以通过创建DStream StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]

文件流不需要运行接收器,因此无需分配任何内核来接收文件数据。

对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)

在本地运行Spark Streaming程序时,请勿使用“ local”或“ local [1]”作为主URL。这两种方式均意味着仅一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如套接字,Kafka,Flume等),则将使用单个线程来运行接收器,而不会留下任何线程来处理接收到的数据。因此,在本地运行时,请始终使用“ local [ n ]”作为主URL,其中n >要运行的接收者数(有关如何设置主服务器的信息,请参见Spark属性)。

为了将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的内核数必须大于接收器数。否则,系统将接收数据,但无法处理它。

下面代码开发本地文件和hdfs的文件流

package com.zgw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Zhaogw&Lss on 2019/10/21.
  */
object SparkStream02 {

    def main(args: Array[String]): Unit = {
      var sparkConf =new SparkConf().setMaster("local[*]").setAppName("SparkStream02").set("spark.testing.memory", "2147480000")

      //分析环境对象以及采集周期
      val streamContext = new StreamingContext(sparkConf,Seconds(10))

      val inputFile = "hdfs://192.168.181.128:8020/spark/"

      //文件流
      /*val fileStreamLine: DStream[String] = streamContext.textFileStream("file:///E:/test")*/
      val fileStreamLine: DStream[String] = streamContext.textFileStream(inputFile)
      //将采集数据进行分解
      val dStream: DStream[String] = fileStreamLine.flatMap(line => line.split(" "))

      //将数据进行结构转变
      val map: DStream[(String, Int)] = dStream.map((_,1))
      //聚合处理
      val key: DStream[(String, Int)] = map.reduceByKey(_+_)
      //结果打印
      key.print()
      //启动采集器
      streamContext.start()
      //等待采集器执行
      streamContext.awaitTermination()


    }


}

Spark Streaming处理文件(本地文件以及hdfs上面的文件)
Spark Streaming处理文件(本地文件以及hdfs上面的文件)
Spark打印出文件流的信息,这里有几个要注意的点,一是写hdfs文件路径时要注意fs.defaultFS(在core-xml的配置)是要能在外面访问通的
Spark Streaming处理文件(本地文件以及hdfs上面的文件)
Spark Streaming处理文件(本地文件以及hdfs上面的文件)

telnet 192.168.181.128 8020      能通,不然会报连接拒绝的错

监控规则:

  • 可以监视一个简单目录,例如"hdfs://namenode:8040/logs/"。发现后,将直接处理该路径下的所有文件

  • A POSIX glob pattern can be supplied, such as "hdfs://namenode:8040/logs/2017/*". Here, the DStream will consist of all files in the directories matching the pattern. That is: it is a pattern of directories, not of files in directories.

  • 所有文件必须使用相同的数据格式。

  • 根据文件的修改时间而非创建时间,将其视为时间段的一部分。

  • 处理后,在当前窗口中对文件的更改将不会导致重新读取该文件。也就是说:忽略更新。

  • 目录下的文件越多,扫描更改所需的时间就越长-即使未修改任何文件。

  • 如果使用通配符来标识目录(例如)“hdfs://namenode:8040/logs/2016-*”,则重命名整个目录以匹配路径会将目录添加到受监视目录列表 中。流中仅包含目录中修改时间在当前窗口内的文件。

  • 调用FileSystem.setTimes() 修复时间戳是一种在以后的窗口中拾取文件的方法,即使其内容没有更改。

完整代码托管于https://github.com/daizikaikou/learningSpark