spark 教程Spark开发的笔记供小白参考
程序员文章站
2023-03-12 13:50:05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
obje...
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { /** * 单词计数程序-Scala版本 */ def main(args: Array[String]): Unit = { /** * spark-shell: * spark:SparkSession 主要针对的是SparkSQL * SparkSQL程序入口 * sc:SparkCore对象,SparkCore的程序入口 * 在spark-shell中已经初始化好了sc,但是我们代码中需要创建对象 */ //配置文件 val conf = new SparkConf() //如果不设置,默认运行的是集群模式,设置成local运行local模式,直接在IDEA中运行即可 conf.setMaster("local") //必须要设置,否则会报错,设置任务名字 conf.setAppName("WordCount"); //创建SparkCore的程序入口 val sc = new SparkContext(conf) //以上相当于Spark-shell帮我们干的事 //读取文件生成RDD val fileRDD: RDD[String] = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt") //把每一行数据按照逗号,分隔 val wordRDD: RDD[String] = fileRDD.flatMap(line => line.split(",")) //让每一个单词都出现一次 val wordOneRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1)) //单词计数 val wordCountRDD: RDD[(String, Int)] = wordOneRDD.reduceByKey(_ + _) //按照单词出现次数 降序排序 sortByKey()只能按照key排序,降序是false,true是升序 val sortedRDD: RDD[(String, Int)] = wordCountRDD.sortBy(tuple => tuple._2, false) //打印结果 sortedRDD.foreach(tuple => { println("单词" + tuple._1 + "出现的次数" + tuple._2) }) /** * 单词计数:流式编程,函数式编程 */ /*sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2).foreach(tuple => { println(tuple._1 + " " + tuple._2) })*/ sc.stop() } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCount_java { /** * java-7 * 没有lambda表达式 * 单词计数程序 */ public static void main(String[] args) { //配置文件 final SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCount_java"); //初始化程序入口 final JavaSparkContext sc = new JavaSparkContext(conf); final JavaRDD fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt"); //在java中我们需要传入FlatMapFunction类型,第一个是输入的数据类型,第二个参数是输出的数据类型,里面要实现Iterator抽象方法 final JavaRDD wordRDD = fileRDD.flatMap(new FlatMapFunction() { @Override public Iterator call(String line) throws Exception { //固定操作 return Arrays.asList(line.split(",")).iterator(); } }); //只要是ByKey方法就要求RDD里面必须是Key-value键值对类型 //下面注释的代码是错误的,我们正常的逻辑希望最后有一个reduceByKey()但是没有 // map()方法,要什么类型就new什么 /* final JavaRDD> wordOneRDD = wordRDD.map(new Function>() { @Override public HashMap call(String word) throws Exception { final HashMap map = new HashMap<>(); map.put(word, 1); return null; } });*/ //我们只要是做什么ByKey的操作需要转换成ToPair,sortByKey,groupByKey,reduceByKey final JavaPairRDD wordOneRDD = wordRDD.mapToPair(new PairFunction() { @Override public Tuple2 call(String word) throws Exception { return new Tuple2<>(word, 1); } }); final JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); final JavaPairRDD count2wordRDD = wordCountRDD.mapToPair(new PairFunction, Integer, String>() { @Override public Tuple2 call(Tuple2 tuple) throws Exception { return new Tuple2(tuple._2, tuple._1); } }); final JavaPairRDD sortedRDD = count2wordRDD.sortByKey(false); sortedRDD.foreach(new VoidFunction>() { @Override public void call(Tuple2 tuple) throws Exception { System.out.println("单词:" + tuple._2 + "次数:" + tuple._1); } }); sc.stop(); } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class WordCount_java8 { /** * JAVA-8开发单词计数程序 */ public static void main(String[] args) { //配置文件 final SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCount_java8"); //初始化程序入口 final JavaSparkContext sc = new JavaSparkContext(conf); final JavaRDD fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt"); final JavaRDD wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator()); final JavaPairRDD wordOneRDD = wordRDD.mapToPair(word -> new Tuple2(word, 1)); final JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey((m, n) -> m + n); final JavaPairRDD count2wordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); final JavaPairRDD sortedRDD = count2wordRDD.sortByKey(false); sortedRDD.foreach(tuple->{ System.out.println("单词:"+tuple._2+"次数:"+tuple._1); }); sc.stop(); //流式写法TopN take表示取前几个 final List> result = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(line -> Arrays.asList(line.split(",")).iterator()).mapToPair(word -> new Tuple2(word, 1)).reduceByKey((m, n) -> m + n).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1)).sortByKey(false).take(2); result.forEach(t->{ System.out.println(t._2()+" "+t._1()); }); } }
上一篇: ajax 实现一个网页版山寨QQ