欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming入门

程序员文章站 2024-02-22 22:09:40
...

1、概述

        A.Spark Streaming 是Spark Core 的API的扩展,它是一个支持扩展,高吞吐量、容错的流式处理框架。

        B.数据可以从kafka、flume、kinesis、TCP 端口收集,可以被诸如map、reduce、join、window(实时计算特有概念)等高级别函数处理,最后输出到文件系统、数据库或者显示屏上。

        C.实际上,你还可以在数据流上应用机器学习(有些公司在做实时的机器学习)和图计算。


    问题1:为什么Spark Streaming 可以结合机器学习和图计算,为什么不用Storm呢?

        答:Spark 是一个提供一站式解决大数据问题的框架,借助于Dataset和DataFrame,Spark的所有组件可以完美协调。对于Spark Streaming对接机器学习,数据由Spark Streaming 处理后直接交由机器学习框架进行处理即可,若是由Storm完成相应的功能,必然需要先进行数据落地后再交由机器学习框架处理。


       2、 Spark Streaming 的工作原理

       A. Spark Streaming接收实时数据流,并按照一定时间间隔将连续的数据流拆分成一批批离散的数据集。

        B.然后应用诸如map、reduce、join、window等丰富的API进行复杂的数据处理后,交由Spark的引擎进行计算,生成批量结果数据后输出。

        注意:Spark Streaming被称为准实时处理系统,它并不是真正意义上的实时处理系统。实时可以使用Flink。

Spark Streaming入门

3、DStream

A.Spark Streaming提供一个高级别的抽象,称之为Discretized Stream,简称DStream,它代表一个连续不断的数据流。B.DStreams有两种创建的方式,一是可以从诸如kafka、flume、kinesis、等输入的数据流上创建,而是可以通过在其他DStreams上应用高级操作创建。实际上,DStream代表的是一个RDD的序列。

C.可以使用java、python、scala三种语言编写Spark Streaming应用程序。在python中,有一些api是不同的或者不可用的,我们做了相应的标记。


 问题2:Spark Streaming对比Storm。

答:目前应用最广泛的大数据流式处理框架是Storm。

       Spark Streaming最低0.5~2秒做一次处理,Storm最低可达0.1s,在实时性和容错性上Storm更佳。借助于RDD,Spark Streaming可以与Spark上所有组件实现无缝对接共享数据,还可以十分方便的kafka、flume等日志收集框架进行集成,同时

Spark Streaming的吞吐量要远远大于Storm,所以在集成性和吞吐量上Spark Streaming优于Storm。


4、一个简单的例子

         来快速完成一个Spark Streaming的例子,即让我们来统计一个监听的TCP端口接收到的文本数据中的词频。

        首先,为了添加一些其他类中可用的方法在我们的环境中,我们要导入一些Spark Streaming的类和StreamingContext中的一些隐式转换,我们创建了一个带有两个线程的StreamingContext,批处理间隔为1s。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
注意:local[n]中n最小值为2,因为每个Receiver占用一个core,至少一个Receiver,Main()占用一个core。core数量不够则只启用Receiver接受数据而不处理数据
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
        为了使用StreamingContext,我们可以通过创建一个指定主机名(e.g. localhost)和端口号(e.g. 9999)的DStream来接收从TCP端口源过来的数据。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

        这个名为lines的DStream包含了所有从数字服务器接收到的流式数据。这个DStream中的每个record都是文本数据中的一行文本,之后我们想要将这些lines用空格符号拆分成一个一个的单词。

        ##读取端口数据有三个方法:socketTextStream、socketStream、rawSocketStream

// Split each line into words
val words = lines.flatMap(_.split(" ")) 

flatMap是一个一对多的DStream操作,它会创建一个新的DStream,源DStream中的一条record对应于目标record中的多条

record。在这种情况下,每一行会被拆成多个words,用来表征的是一个名为words的 DStream。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

        这个名为words的DStream通过map(一对一操作)进一步转换成一个包含(words,1)键值对的DStream,之后在每一批次数据中,这个键值对通过reduceByKey被转换成单词的频率,最后,wordCounts.print()打印在这一秒获取的数据中单词的频数。

        请注意,执行以上代码,Spark Streaming只是将它们设置为在启动时执行,并未真正开始执行。要想真正执行以上代码,我们需要在最后加上以下内容:

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

        完整的代码可以在Spark Streaming的例子NetworkWordCount中找到。

------------------------------------------------

------------------------------------------------

        如果你已经下载安装好Spark,你可以如下所示来运行这个例子,首先你需要通过以下操作在服务器上运行一个Netcat来使其成为数据服务器:

$ nc -lk 9999

        之后,你可以在另一个控制台通过以下操作来运行这个例子:

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

        至此,每一秒在运行Netcat的终端上打印的数据都会被计数和输出到屏幕上。运行效果如下:

Spark Streaming入门

5、读取端口数据有三个方法:socketTextStream、socketStream、rawSocketStream,以下分别讲解

A.socketTextStream(hostname,port,storageLevel) storageLevel默认为MEMORY_AND_DISK_SER_2,这和spark cache中的存储级别是不一样的,面试经常问到 底层调用socketStream

B.socketStream(...,...,...,...)   

C.rawSocketStream(hostname,port,storageLevel)  storageLevel默认为MEMORY_AND_DISK_SER_2,


三、基本概念

1、连接到Spark Streaming

        和Spark一样,Spark Streaming可以通过Maven中心仓库进行连接。为了书写你的Spark Streaming应用程序,你必须讲以下的依赖加入Maven工程中。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
</dependency>

        为了从kafka、flume、kinesis这些数据源获取数据,我们必须添加相应的依赖。一些常用的依赖添加如下所示:

Source Artifact
Kafka spark-streaming-kafka-0-10_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

        对于最新的数据源列表,请参考Maven repository


2、初始化StreamingContext

        初始化一个Spark Streaming应用程序,一个StreamingContext对象必须被创建,这个StreamingContext是所有Spark Streaming函数的主入口点。

        一个StreamingContext对象可以由一个SparkConf对象创建。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

        appName参数设置的是你的应用程序展示在集群UI上的名字,master可以是Spark,Mesos,Yarn Cluster URL,或者local[*],是其运行在本地模式下。在实践中,你最好不要在你的应用程序中硬编码一个master,而是在使用spark-submit提交应用程序时指定master。对于本地测试和单元测试,你可以使用local[*],请注意,这个内部创建了一个SparkContext(所有Spark函数的入口点),可以使用ssc.SparkContext来进行访问。

          必须根据应用程序的延迟需求和可用的集群资源来设置批处理间隔。更多的细节参考Performance Tuning

        一个StreamingContext对象也可以由一个已经存在的SparkContext对象来创建。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

        当一个Context被定义,你必须做以下的事情:

            a.通过定义输入DStream来创建输入源。

            b.通过在DStream上应用转换操作和输出操作来定义流计算。

            c.使用StreamContext.start()开始来接收数据和处理数据。

            d.使用StreamContext.awaitTermination()来等待计算完成(手动或者因错误终止)。

            e.可以StreamContext.stop()来手动停止计算(一般不会停止,流处理停它干嘛)。

        关键点:

            a.一旦一个StreamingContext被启动,就不能再设置或添加新的流计算。

            b.一旦一个StreamingContext被停止,就不能重新启动。

            c.同一时间内,在JVM内部只有一个StreamingContext处于活跃状态。

            d.默认情况下使用stop()方法停止StreamingContext的同时也会停止SparkContext,如果执行停止StreamingContext,可以将stop()的可选参数设置为false。

            e.SparkContext可以复用,即用来创建多个StreamingContext,只要在创建新的StreamingContext时,之前创建的StreamingContext是处于stop状态即可(SparkContext没有被停止)。