欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark Streaming 编程入门指南

程序员文章站 2022-07-06 10:11:33
Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中提取数据,并且可以使用复杂的算法处理数据,这些算法用高级函数表示,如map、reduce、join和 ......

spark streaming 是核心spark api的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。可以从许多数据源(例如kafka,flume,kinesis或tcp sockets)中提取数据,并且可以使用复杂的算法处理数据,这些算法用高级函数表示,如map、reduce、join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,可以在数据流上应用spark的机器学习和图形处理算法。

Spark Streaming 编程入门指南

在内部,它的工作方式如下。 spark streaming接收实时输入数据流,并将数据分成批次,然后由spark引擎进行处理,以生成批次的最终结果流。

Spark Streaming 编程入门指南

spark streaming提供了一种高级抽象,称为离散流或dstream,它表示连续的数据流。dstreams可以从kafka、flume和kinesis等源的输入数据流创建,也可以通过在其他dstreams上应用高级操作创建。在内部,dstream表示为rdds序列。

1. 了解spark

apache spark 是一个用于大规模数据处理的统一分析引擎

Spark Streaming 编程入门指南  Spark Streaming 编程入门指南  Spark Streaming 编程入门指南

特性:

将工作负载运行速度提高100倍

apache spark使用最新的dag调度程序,查询优化器和物理执行引擎,为批处理数据和流数据提供了高性能。

易用

可以使用java,scala,python,r和sql快速编写应用程序

通用

结合sql、流和复杂的分析

spark为包括sql和dataframes,用于机器学习的mllib,graphx和spark streaming在内的一堆库提供支持。您可以在同一应用程序中无缝组合这些库。

到处运行

spark可在hadoop,apache mesos,kubernetes,独立或云中运行。它可以访问各种数据源

可以在ec2,hadoop yarn,mesos或kubernetes上使用其独立集群模式运行spark。访问hdfs,alluxio,apache cassandra,apache hbase,apache hive和数百种其他数据源中的数据。

2. 入门案例

统计单词出现的次数,这个例子在hadoop中用mapreduce也写过。

javastreamingcontext是java版的streamingcontext。它是spark streaming功能的主要入口点。它提供了从输入源创建javadstream和javapairdstream的方法。可以使用context.sparkcontext访问内部的org.apache.spark.api.java.javasparkcontext。在创建和转换dstream之后,可以分别使用context.start()和context.stop()启动和停止流计算。

 1 public static void main(string[] args) throws interruptedexception {
 2     // create a local streamingcontext with two working thread and batch interval of 1 second
 3     sparkconf conf = new sparkconf().setmaster("local[2]").setappname("networkwordcount");
 4     javastreamingcontext jssc = new javastreamingcontext(conf, durations.seconds(1));
 5 
 6     // create a dstream that will connect to hostname:port, like localhost:9999
 7     javareceiverinputdstream<string> lines = jssc.sockettextstream("localhost", 9999);
 8 
 9     // split each line into words
10     javadstream<string> words = lines.flatmap(x -> arrays.aslist(x.split(" ")).iterator());
11 
12     // count each word in each batch
13     javapairdstream<string, integer> pairs = words.maptopair(s -> new tuple2<>(s, 1));
14     javapairdstream<string, integer> wordcounts = pairs.reducebykey((i1, i2) -> i1 + i2);
15 
16     // print the first ten elements of each rdd generated in this dstream to the console
17     wordcounts.print();
18 
19     // start the computation
20     jssc.start();
21     // wait for the computation to terminate
22     jssc.awaittermination();
23 }

3. 基本概念

3.1. maven依赖

1 <groupid>org.apache.spark</groupid>
2     <artifactid>spark-streaming_2.12</artifactid>
3     <version>2.4.5</version>
4     <scope>provided</scope>
5 </dependency>

为了从其它数据源获取数据,需要添加相应的依赖项spark-streaming-xyz_2.12。例如:

1 <dependency>
2     <groupid>org.apache.spark</groupid>
3     <artifactid>spark-streaming-kafka-0-10_2.12</artifactid>
4     <version>2.4.5</version>
5 </dependency>

3.2. 初始化streamingcontext

为了初始化一个spark streaming程序,必须创建一个streamingcontext对象,该对象是所有spark streaming功能的主要入口点。 

我们可以从sparkconf对象中创建一个javastreamingcontext对象

1 import org.apache.spark.sparkconf;
2 import org.apache.spark.streaming.duration;
3 import org.apache.spark.streaming.api.java.javastreamingcontext;
4 
5 sparkconf conf = new sparkconf().setappname(appname).setmaster(master);
6 javastreamingcontext ssc = new javastreamingcontext(conf, new duration(1000)); 

appname 参数是显示在集群ui上的你的应用的名字

master 参数是一个spark、 mesos 或 yarn 集群url,或者也可以是一个特定的字符串“local[*]”表示以本地模式运行。实际上,当在集群上运行时,肯定不希望对在程序中对master进行硬编码,而希望通过spark-submit启动应用程序并在其中接收它。然而,对于本地测试,你可以传“local[*]”来运行spark streaming。

还可以从一个已存在的javasparkcontext中创建一个javastreamingcontext对象

1 import org.apache.spark.streaming.api.java.*;
2 
3 javasparkcontext sc = ...   //existing javasparkcontext
4 javastreamingcontext ssc = new javastreamingcontext(sc, durations.seconds(1));

在定义完context之后,必须做以下事情:

  1. 通过创建input dstreams来定义input sources
  2. 通过对dstreams应用transformation(转换)和output(输出)操作来定义流计算
  3. 用streamingcontext.start()来开始接收数据并处理它
  4. 用streamingcontext.awaittermination()等待处理停止(手动停止或由于任何错误)
  5. 用streamingcontext.stop()可以手动停止

需要记住的点:

  • 一旦启动上下文,就无法设置新的流计算或将其添加到该流计算中
  • 上下文一旦停止,就无法重新启动
  • 一个jvm中只能同时激活一个streamingcontext
  • streamingcontext中的stop()也会停止sparkcontext。但如果要仅停止streamingcontext的话,设置stop(false)
  • 只要在创建下一个streamingcontext之前停止了上一个streamingcontext(不停止sparkcontext),就可以将sparkcontext重用于创建多个streamingcontext

3.3. dstreams(离散流)

discretized streamdstream 是spark streaming提供的基本抽象。它表示一个连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,dstream由一系列连续的rdd表示,这是spark对不变的分布式数据集的抽象。dstream中的每个rdd都包含来自特定间隔的数据,如下图所示。 

Spark Streaming 编程入门指南

在dstream上执行的任何操作都转换为对基础rdd的操作。例如,最简单的将一行句子转换为单词的例子中,flatmap操作应用于行dstream中的每个rdd,以生成单词dstream的rdd。如下图所示:

Spark Streaming 编程入门指南 

3.4. input dstreams 和 receivers

input dstream是表示从源接收的输入数据流。在上图中,lines是输入dstream,因为它表示从netcat服务器接收的数据流。每一个输入dstream都关联着一个receiver对象,该对象从源接收数据并将其存储在spark的内存中以进行处理。

spark streaming提供了两类内置的streaming源:

  • basic sources :直接在streamingcontext api中可用的源。例如,文件系统和socket连接
  • advanced sources :像kafka,flume,kinesis等这样的源,可通过额外的程序类获得 

如果要在流应用程序中并行接收多个数据流,则可以创建多个输入dstream。这将创建多个receiver(接收器),这些接收器将同时接收多个数据流。重要的是要记住,必须为spark streaming应用程序分配足够的内核(或线程,如果在本地运行),以处理接收到的数据以及运行接收器。

需要记住的点:

  • 在本地运行spark streaming程序时,请勿使用“ local”或“ local [1]”作为master url。这两种方式均意味着仅一个线程将用于本地运行任务。如果使用的是基于接收器的输入dstream(例如套接字,kafka,flume等),则将使用单个线程来运行接收器,而不会留下任何线程来处理接收到的数据。 因此,在本地运行时,请始终使用“ local [n]”作为主url,其中n>要运行的接收器数 
  • 为了将逻辑扩展到在集群上运行,分配给spark streaming应用程序的内核数必须大于接收器数。 否则,系统将接收数据,但无法处理它。

basic sources 

为了从文件中读取数据,可以通过streamingcontext.filestream[keyclass, valueclass, inputformatclass]来创建一个dstream

例如:streamingcontext.textfilestream(datadirectory);

spark streaming将监视目录datadirectory并处理在该目录中创建的所有文件

  • 可以监视一个简单的目录,例如:"hdfs://namenode:8040/logs/2017/*"。在这里,dstream将由目录中与模式匹配的所有文件组成。也就是说:它是目录的模式,而不是目录中的文件。
  • 所有文件必须使用相同的数据格式
  • 根据文件的修改时间而不是创建时间,将其视为时间段的一部分
  • 一旦已经被处理后,在当前窗口中对文件的更改不会导致重新读取该文件。即:更新被忽略。

3.5. transformations on dstreams

对dstreams做转换,与rdd相似,转换允许修改输入dstream中的数据。dstream支持普通spark rdd上可用的许多转换。一些常见的方法如下:

map(func) 通过将源dstream的每个元素传递给函数func来处理并返回新的dstream
flatmap(func) 与map类似,但是每个输入项可以映射到0个或多个输出项
filter(func) 过滤
repartition(numpartitions)  通过创建更多或更少的分区来更改此dstream中的并行度
union(otherstream)  将源dstream和另一个dstream中的元素合并在一起,返回一个新的dstream。相当于sql中的union 
count() 返回元素的个数
reduce(func)  通过使用函数func(接受两个参数并返回一个)来聚合源dstream的每个rdd中的元素,从而返回一个单元素rdd的新dstream。 
countbyvalue() 


在类型为k的元素的dstream上调用时,返回一个新的(k,long)形式的dstream,其中每个键的值是其在源dstream的每个rdd中的频率。

 
reducebykey(func, [numtasks])  在一个(k,v)形式的dstream上调用时,返回一个新的(k,v)dstream,其中使用给定的reduce函数汇总每个键的值
join(otherstream, [numtasks])  在(k,v)和(k,w)两个dstream上调用时,返回一个新的(k,(v,w))dstream 
cogroup(otherstream, [numtasks])  在(k,v)和(k,w)dstream上调用时,返回一个新的(k,seq [v],seq [w])元组的dstream 
transform(func)  通过对源dstream的每个rdd应用rdd-to-rdd函数来返回新的dstream。这可用于在dstream上执行任意rdd操作。
updatestatebykey(func)  返回一个新的“state” dstream 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

其实,这次操作跟java stream很像

Spark Streaming 编程入门指南

window operations(窗口操作)

spark streaming还提供了窗口计算,可以在数据的滑动窗口上应用转换。下图说明了此滑动窗口:

Spark Streaming 编程入门指南

如图所示,每当窗口在源dstream上滑动时,就会对落入窗口内的源rdd进行操作,以生成窗口dstream的rdd。

任何窗口函数所必须的两个参数:

  • 窗口的长度
  • 滑到的频率(或者说时间间隔)

举个例子,我们来扩展前面的示例,假设我们想要每10秒在数据的最后30秒生成一次单词次数统计。为此,必须在数据的最后30秒内对(word,1)对的dstream对应用reducebykey操作。

 1 import org.apache.spark.streaming.durations;
 2 import org.apache.spark.streaming.api.java.javadstream;
 3 import org.apache.spark.streaming.api.java.javapairdstream;
 4 import scala.tuple2;
 5 
 6 
 7 javadstream<string> words = lines.flatmap(x -> arrays.aslist(x.split(" ")).iterator());
 8 javapairdstream<string, integer> pairs = words.maptopair(s -> new tuple2<>(s, 1));
 9 
10 // reduce last 30 seconds of data, every 10 seconds
11 javapairdstream<string, integer> windowedwordcounts = pairs.reducebykeyandwindow((i1, i2) -> i1 + i2, durations.seconds(30), durations.seconds(10));

一些常见的窗口操作如下。所有这些操作均采用上述两个参数:windowlength和slideinterval

window(windowlength, slideinterval) 返回基于源dstream的窗口批处理计算的新dstream
countbywindow(windowlength, slideinterval) 返回流中元素的滑动窗口数
reducebywindow(func, windowlength, slideinterval) 对窗口内的数据进行聚合操作
reducebykeyandwindow(func, windowlength, slideinterval, [numtasks]) 在(k,v)dstream上调用时,返回新的(k,v)dstream,其中使用给定的reduce函数func在滑动窗口中的批处理上汇总每个键的值
reducebykeyandwindow(func, invfunc, windowlength, slideinterval, [numtasks])  
countbyvalueandwindow(windowlength, slideinterval, [numtasks])  

 

 

 

 

 

 

 

 

 

 

 

 

3.6. output operations on dstreams

输出操作允许将dstream的数据输出到外部系统,例如数据库或文件系统。

Spark Streaming 编程入门指南 

流式应用程序必须24/7全天候运行,因此必须能够抵抗与应用程序逻辑无关的故障(例如,系统故障,jvm崩溃等)。为此,spark streaming需要将足够的信息检查点指向容错存储系统,以便可以从故障中恢复。检查点有两种类型的数据。

  • 元数据检查点-将定义流计算的信息保存到hdfs等容错存储中。这用于从运行流应用程序的驱动程序的节点的故障中恢复。
  • 数据检查点-将生成的rdd保存到可靠的存储中 

 

完整代码:

 1 package com.example.demo;
 2 
 3 import org.apache.spark.sparkconf;
 4 import org.apache.spark.streaming.durations;
 5 import org.apache.spark.streaming.api.java.javadstream;
 6 import org.apache.spark.streaming.api.java.javapairdstream;
 7 import org.apache.spark.streaming.api.java.javastreamingcontext;
 8 import scala.tuple2;
 9 
10 import java.util.arrays;
11 import java.util.regex.pattern;
12 
13 /**
14  * @author chengjiansheng
15  */
16 public class javawordcount {
17 
18     private static final pattern space = pattern.compile(" ");
19 
20     public static void main(string[] args) {
21         if (args.length < 1) {
22             system.err.println("usage: javawordcount <file>");
23             system.exit(1);
24         }
25 
26         sparkconf conf = new sparkconf().setmaster("local[*]").setappname("javawordcount");
27         javastreamingcontext jssc = new javastreamingcontext(conf, durations.seconds(1));
28 
29         javadstream<string> lines = jssc.textfilestream(args[0]);
30         javadstream<string> words = lines.flatmap(line -> arrays.aslist(space.split(line)).iterator());
31         javapairdstream<string, integer> ones = words.maptopair(word -> new tuple2<>(word, 1));
32         javapairdstream<string, integer> counts = ones.reducebykey((i1, i2) -> i1 + i2);
33         counts.print();
34 
35         /*
36         javastreamingcontext jsc = new javastreamingcontext(conf, durations.seconds(1));
37         javadstream<string> textfilestream = jsc.textfilestream("/data");
38         textfilestream.flatmap(line->arrays.aslist(line.split(" ")).iterator())
39                 .maptopair(word->new tuple2<>(word, 1))
40                 .reducebykey((a,b)->a+b)
41                 .print();
42         jsc.start();
43         */
44     }
45 }
转自:https://www.cnblogs.com/cjsblog/p/12620974.html