Spark Streaming入门
1、概述
A.Spark Streaming 是Spark Core 的API的扩展,它是一个支持扩展,高吞吐量、容错的流式处理框架。
B.数据可以从kafka、flume、kinesis、TCP 端口收集,可以被诸如map、reduce、join、window(实时计算特有概念)等高级别函数处理,最后输出到文件系统、数据库或者显示屏上。
C.实际上,你还可以在数据流上应用机器学习(有些公司在做实时的机器学习)和图计算。
问题1:为什么Spark Streaming 可以结合机器学习和图计算,为什么不用Storm呢?
答:Spark 是一个提供一站式解决大数据问题的框架,借助于Dataset和DataFrame,Spark的所有组件可以完美协调。对于Spark Streaming对接机器学习,数据由Spark Streaming 处理后直接交由机器学习框架进行处理即可,若是由Storm完成相应的功能,必然需要先进行数据落地后再交由机器学习框架处理。
2、 Spark Streaming 的工作原理
A. Spark Streaming接收实时数据流,并按照一定时间间隔将连续的数据流拆分成一批批离散的数据集。
B.然后应用诸如map、reduce、join、window等丰富的API进行复杂的数据处理后,交由Spark的引擎进行计算,生成批量结果数据后输出。
注意:Spark Streaming被称为准实时处理系统,它并不是真正意义上的实时处理系统。实时可以使用Flink。
3、DStream
A.Spark Streaming提供一个高级别的抽象,称之为Discretized Stream,简称DStream,它代表一个连续不断的数据流。B.DStreams有两种创建的方式,一是可以从诸如kafka、flume、kinesis、等输入的数据流上创建,而是可以通过在其他DStreams上应用高级操作创建。实际上,DStream代表的是一个RDD的序列。
C.可以使用java、python、scala三种语言编写Spark Streaming应用程序。在python中,有一些api是不同的或者不可用的,我们做了相应的标记。
问题2:Spark Streaming对比Storm。
答:目前应用最广泛的大数据流式处理框架是Storm。
Spark Streaming最低0.5~2秒做一次处理,Storm最低可达0.1s,在实时性和容错性上Storm更佳。借助于RDD,Spark Streaming可以与Spark上所有组件实现无缝对接共享数据,还可以十分方便的kafka、flume等日志收集框架进行集成,同时
Spark Streaming的吞吐量要远远大于Storm,所以在集成性和吞吐量上Spark Streaming优于Storm。
4、一个简单的例子
来快速完成一个Spark Streaming的例子,即让我们来统计一个监听的TCP端口接收到的文本数据中的词频。
首先,为了添加一些其他类中可用的方法在我们的环境中,我们要导入一些Spark Streaming的类和StreamingContext中的一些隐式转换,我们创建了一个带有两个线程的StreamingContext,批处理间隔为1s。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
注意:local[n]中n最小值为2,因为每个Receiver占用一个core,至少一个Receiver,Main()占用一个core。core数量不够则只启用Receiver接受数据而不处理数据
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
为了使用StreamingContext,我们可以通过创建一个指定主机名(e.g. localhost)和端口号(e.g. 9999)的DStream来接收从TCP端口源过来的数据。// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
这个名为lines的DStream包含了所有从数字服务器接收到的流式数据。这个DStream中的每个record都是文本数据中的一行文本,之后我们想要将这些lines用空格符号拆分成一个一个的单词。
##读取端口数据有三个方法:socketTextStream、socketStream、rawSocketStream
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap是一个一对多的DStream操作,它会创建一个新的DStream,源DStream中的一条record对应于目标record中的多条
record。在这种情况下,每一行会被拆成多个words,用来表征的是一个名为words的 DStream。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
这个名为words的DStream通过map(一对一操作)进一步转换成一个包含(words,1)键值对的DStream,之后在每一批次数据中,这个键值对通过reduceByKey被转换成单词的频率,最后,wordCounts.print()打印在这一秒获取的数据中单词的频数。
请注意,执行以上代码,Spark Streaming只是将它们设置为在启动时执行,并未真正开始执行。要想真正执行以上代码,我们需要在最后加上以下内容:
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整的代码可以在Spark Streaming的例子NetworkWordCount中找到。
------------------------------------------------
------------------------------------------------
如果你已经下载安装好Spark,你可以如下所示来运行这个例子,首先你需要通过以下操作在服务器上运行一个Netcat来使其成为数据服务器:
$ nc -lk 9999
之后,你可以在另一个控制台通过以下操作来运行这个例子:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
至此,每一秒在运行Netcat的终端上打印的数据都会被计数和输出到屏幕上。运行效果如下:
5、读取端口数据有三个方法:socketTextStream、socketStream、rawSocketStream,以下分别讲解
A.socketTextStream(hostname,port,storageLevel) storageLevel默认为MEMORY_AND_DISK_SER_2,这和spark cache中的存储级别是不一样的,面试经常问到 底层调用socketStream
B.socketStream(...,...,...,...)
C.rawSocketStream(hostname,port,storageLevel) storageLevel默认为MEMORY_AND_DISK_SER_2,
三、基本概念
1、连接到Spark Streaming
和Spark一样,Spark Streaming可以通过Maven中心仓库进行连接。为了书写你的Spark Streaming应用程序,你必须讲以下的依赖加入Maven工程中。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
为了从kafka、flume、kinesis这些数据源获取数据,我们必须添加相应的依赖。一些常用的依赖添加如下所示:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-10_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
对于最新的数据源列表,请参考Maven repository。
2、初始化StreamingContext
初始化一个Spark Streaming应用程序,一个StreamingContext对象必须被创建,这个StreamingContext是所有Spark Streaming函数的主入口点。
一个StreamingContext对象可以由一个SparkConf对象创建。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName参数设置的是你的应用程序展示在集群UI上的名字,master可以是Spark,Mesos,Yarn Cluster URL,或者local[*],是其运行在本地模式下。在实践中,你最好不要在你的应用程序中硬编码一个master,而是在使用spark-submit提交应用程序时指定master。对于本地测试和单元测试,你可以使用local[*],请注意,这个内部创建了一个SparkContext(所有Spark函数的入口点),可以使用ssc.SparkContext来进行访问。
必须根据应用程序的延迟需求和可用的集群资源来设置批处理间隔。更多的细节参考Performance Tuning。
一个StreamingContext对象也可以由一个已经存在的SparkContext对象来创建。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
当一个Context被定义,你必须做以下的事情:
a.通过定义输入DStream来创建输入源。
b.通过在DStream上应用转换操作和输出操作来定义流计算。
c.使用StreamContext.start()开始来接收数据和处理数据。
d.使用StreamContext.awaitTermination()来等待计算完成(手动或者因错误终止)。
e.可以StreamContext.stop()来手动停止计算(一般不会停止,流处理停它干嘛)。
关键点:
a.一旦一个StreamingContext被启动,就不能再设置或添加新的流计算。
b.一旦一个StreamingContext被停止,就不能重新启动。
c.同一时间内,在JVM内部只有一个StreamingContext处于活跃状态。
d.默认情况下使用stop()方法停止StreamingContext的同时也会停止SparkContext,如果执行停止StreamingContext,可以将stop()的可选参数设置为false。
e.SparkContext可以复用,即用来创建多个StreamingContext,只要在创建新的StreamingContext时,之前创建的StreamingContext是处于stop状态即可(SparkContext没有被停止)。
下一篇: MySql中批量更新-批量更换字段