spark streaming JavaQueueStream实例改造测试数据流
程序员文章站
2022-03-31 18:10:49
...
为了搞清楚Spark Streaming处理数据流,改造了以有的例子来进行测试数据在Spark内部的流向。
package org.apache.spark.examples.streaming; import java.util.LinkedList; import java.util.List; import java.util.Queue; import scala.Tuple2; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; public final class JavaQueueStream { private JavaQueueStream() { } public static void main(String[] args) throws Exception { StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream"); // Create the context JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create the queue through which RDDs can be pushed to // a QueueInputDStream Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>(); // Create and push some RDDs into the queue List<Integer> list = Lists.newArrayList(); for (int i = 0; i < 10; i++) { list.add(i); } rddQueue.add(ssc.sparkContext().parallelize(list)); /* for (int i = 0; i <5; i++) { for(int j=0;j<i;j++){ list.remove(j); } rddQueue.add(ssc.sparkContext().parallelize(list)); } */ // Create the QueueInputDStream and use it do some processing JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue); //new PairFunction<Integer, Integer, Integer> 第一个参数为Call中的参数类型,第二个和第三个为Tupe2中的Key Value //JavaPairDStream<Integer, Integer> 是Tupe2的返回类型 JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair( new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer i) { if(i==3 ){ i=5; }else if(i%2==0){ i=2; } return new Tuple2<Integer, Integer>(i, 1); } }); //Function2<T1,T2,R> JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { System.out.println(i1+"====&===="+i2); return i1+i2; } }); System.out.println("==================================================begin"); reducedStream.print(); System.out.println("==================================================end"); ssc.start(); ssc.awaitTermination(); } }
测试结果1:
==================================================begin ==================================================end 14/08/28 09:58:04 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 1====&====1 2====&====1 1====&====1 3====&====1 4====&====1 ------------------------------------------- Time: 1409245080000 ms ------------------------------------------- (1,1) (7,1) (9,1) (5,2) (2,5) ------------------------------------------- Time: 1409245081000 ms ------------------------------------------- ------------------------------------------- Time: 1409245082000 ms -------------------------------------------
/*测试结果2( return new Tuple2<Integer, Integer>(i, 1)代码 改为return new Tuple2<Integer, Integer>(i, 10)) 为了测试为什么(1,1)(9,1)等的数据为什么在Reduce里不打印出来*/ ==================================================begin ==================================================end 14/08/28 10:05:55 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 10====&====10 20====&====10 10====&====10 30====&====10 40====&====10 ------------------------------------------- Time: 1409245552000 ms ------------------------------------------- (1,10) (7,10) (9,10) (5,20) (2,50)
针对1进行数据流分析: