Spark Streaming入门
程序员文章站
2024-02-22 21:22:34
...
Spark Streaming是一个有趣且强大的Spark扩展,它支持流数据或者快速的移动数据的近实时处理。
Spark Streaming实现了一个叫做“微批”(micro-batching)的概念,它将在线/流数据划分成若干明确的微批,每个微批作为一条记录被单独处理。每个微批的输出结果被发送到用户定义的输出流中,且能进一步存入HDFS、NoSQL,或者可以生成实时控制板。
批的大小受限于可接受的延迟,这个延迟从几毫秒到几秒不等。例如,每100ms收集所有Twiiter种子的数据,并作为一个批进行处理。这样系统会创建多个批,每个包含100ms内的Twiiter数据,然后将这些批连续地发给Spark进行处理。
下图所示为Spark Streaming的上层体系结构图:
输入数据流:这些输入数据源以很快的速度产生数据(每秒、每毫秒甚至更快),然后将这些数据以持续的流数据的形式发送。Spark提供了特定的接口来接受这些数据用以进一步的处理。这些数据流可以被分为基本数据源和高级数据源。
基本数据源:这些数据源的接口是内建的,或者与Spark Streaming打包在一起,使用这些数据源的接口不需要外部库。
高级数据源:高级数据源的接口不在Spark Streaming包里。使用它们的脚本需要链接到或者包含外部库作为脚本依赖。
批:批就是一个RDD序列。Spark Streaming提供了一个RDD序列的抽象概念DStream(离散流)。DStream保存了其对应RDD序列的引用,这个RDD序列就是根据输入流或者输入流的转换流生成的。所有作用在DStream上的操作都直接作用在了其对应的RDD上面。
第一个Spark Streaming代码:
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.dstream.ForEachDStream
/**
* Created by 13798 on 2017/10/24.
*/
object ScalaFirstStreamingExample {
def main(args: Array[String]): Unit = {
println("Creating Spark Configuration")
//创建一个Spark配置对象
val conf = new SparkConf()
conf.setAppName("My first Spark Streaming Application")
println("Retreiving Streaming Context from Spark Conf")
//从SparkConf对象获得Streaming上下文,第二个数据是流数据划分成批的时间间隔
val streamCtx = new StreamingContext(conf, Seconds(5))
/**
* 定义Stream的类型,这里我们使用TCP套接字作为文本流
* 程序持续监听特定机器(localhost)的9087端口。一旦收到数据就存入内存
* 如果内存不足,就存入磁盘。然后把读到的数据转换成DStream形式
* 其中StorageLevel.MEMORY_AND_DISK_SER就表示使用这种缓存级别的RDD将存储
* 在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上只备份1份
* 而StorageLevel.MEMORY_AND_DISK_SER_2表示使用这种缓存级别的RDD将存储
* 在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
*/
val lines = streamCtx.socketTextStream("localhost", 9087, MEMORY_AND_DISK_SER_2);
/**
* 把Split()函数应用到DStream的所有元素上,这样会从原始流的每个记录中产生
* 多个新的纪录。然后利用flapmap合并这些记录并生成新的DStream
*/
val words = lines.flatMap(x => x.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
printValues(wordCounts, streamCtx)
//最重要的一句话,初始化Streaming上下文
streamCtx.start()
//等待执行结束
streamCtx.awaitTermination()
}
//简单打印函数,打印RDD的所有元素
def printValues(stream: DStream[(String, Int)], streamCtx: StreamingContext): Unit = {
stream.foreachRDD(foreachFunc)
def foreachFunc = (rdd: RDD[(String, Int)]) => {
val array = rdd.collect()
println("------------Start Printing Results-----------------")
for(res <- array){
println(res)
}
println("------------Finishing Printing Results--------------")
}
}
}