欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming编程快速入门

程序员文章站 2022-05-28 18:37:16
...

一、Spark Streaming 简述

  • Spark Streaming是一个构建在Spark之上,是Spark四大组件之一
  • 是Spark系统中用于处理流式数据的分布式流式处理框架
  • 具有可伸缩、高吞吐量、容错能力强等特点。
  • 处理的数据源可以来自Kafka,Flume,Twitter,ZeroMQ,Kinesis or TCP sockets等,结果集可保存到HDFS、数据库或者实时Dashboard展示,如下图所示
    Spark Streaming编程快速入门

二、Spark Streaming运行原理简述

  • Spark Streaming的输入数据按照时间片(batch size)分成一段一段的数据,得到批数据(batch data),每一段数据都转换成Spark中的RDD,然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中的RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。
    Spark Streaming编程快速入门

三、Spark Streaming编程样例

启动spark-shell

完整代码如下:
 import org.apache.spark.streaming._

 sc.setLogLevel("WARN")
 val ssc = new StreamingContext(sc,Seconds(2))
 val lines = ssc.socketTextStream("localhost",8888) // 监听本地8888端口的输入
 // val lines = ssc.textFileStream("/tmp/testSparkStream") // 监听hdfs对应的目录
 val words = lines.flatMap(_.split(" "))
 val wordcount = words.map(x => (x,1)).reduceByKey(_ + _)
 wordcount.print
 ssc.start
 ssc.awaitTermination()

  • 新开一个客户端,通过nc命令向8888端口传送数据
[root@dn02 ~]# nc -l 8888
who you are today 
why you are here
why you are here

结果:

18/08/18 09:22:30 WARN BlockManager: Block input-0-1533720150200 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1533720155000 ms
-------------------------------------------
(today,1)
(are,1)
(who,1)
(you,1)
  • 注:上述代码只会对指定时间片输入的数据流进行处理,并结果不会有叠加处理的操作。当然Spark Streaming是支持对结果进行叠加处理的。详细见下文。

四、SparkStreaming简单demo + Idea +maven环境搭建

  • 1、New一个maven项目,不用勾选archetype
    Spark Streaming编程快速入门

  • 2、将Sparkstream相关的jar通过maven添加到项目中
    pom.xml见下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkStream</groupId>
    <artifactId>SparkStream</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>1.6.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

</project>
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamWC1 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")   // 默认为INFO输出的日志过多,因此将其设置成WARN
    val ssc = new StreamingContext(sc, Seconds(5))

    // Create a DStream that will connect to hostname:port, like localhost:8888
    val lines = ssc.socketTextStream("10.xxx.xxx.xxx", 8888)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}
  • 5、在服务器中开启nc命令并输入内容,两行内容间隔时间大于sparkStream处理的时间批次
[aaa@qq.com ~]# nc -lk 8888
hello world
hello hello spark spark

程序运行在IDEA中输出的结果如下:
Time: 1534410672000 ms
-------------------------------------------
(hello,1)
(world,1)

Time: 1534410676000 ms
-------------------------------------------
(hello,2)
(spark,2)

  • 上述例子是计算当前输入数据批次的,如下则为计算累加的样例代码
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamWC2 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val sc = new SparkContext(conf)

    sc.setCheckpointDir("D:\\\\checkpoint") // 如果是在集群中运行则需要给定hdfs目录
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(2))

    // Create a DStream that will connect to hostname:port, like localhost:8888
    val lines = ssc.socketTextStream("10.xxx.xxx.xxx, 8888)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    // 结果累加
    val wordCounts = pairs.updateStateByKey(updateFunc, new HashPartitioner(sc.defaultParallelism), true)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }

  /**
    * 函数
    * Iterator[(String, Seq[Int], Option[Int])]
    * String:       表示key
    * Seq[Int]:     表示当前批次的value e.g:Seq(1,1,1,1,1,1)
    * Option[Int]:  表示之前的累加值
    */
  val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
    it.map(x => {
      (x._1, x._2.sum + x._3.getOrElse(0))
    })
  }
}
  • 在服务器中开启nc命令并输入内容,两行内容间隔时间大于sparkStream处理的时间批次
[aaa@qq.com ~]# nc -lk 8888
hello world
hello hello spark spark

程序运行在IDEA中输出的结果如下:
Time: 1534411844000 ms
-------------------------------------------
(hello,1)
(world,1)
Time: 1534411846000 ms
-------------------------------------------
(hello,3)
(world,1)
(spark,2)

-------------------------------------------
Time: 1534411848000 ms
-------------------------------------------
(hello,3)
(world,1)
(spark,2)
  • 后续如果没有输入则会一直输出最终的结果
  • (hello,3)
    (world,1)
    (spark,2)