欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark 教程Spark开发的笔记供小白参考

程序员文章站 2022-05-21 16:09:53
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} obje...
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  /**
    * 单词计数程序-Scala版本
    */
  def main(args: Array[String]): Unit = {
    /**
      * spark-shell:
      * spark:SparkSession 主要针对的是SparkSQL
      * SparkSQL程序入口
      * sc:SparkCore对象,SparkCore的程序入口
      * 在spark-shell中已经初始化好了sc,但是我们代码中需要创建对象
      */
    //配置文件
    val conf = new SparkConf()
    //如果不设置,默认运行的是集群模式,设置成local运行local模式,直接在IDEA中运行即可
    conf.setMaster("local")
    //必须要设置,否则会报错,设置任务名字
    conf.setAppName("WordCount");
    //创建SparkCore的程序入口
    val sc = new SparkContext(conf)
    //以上相当于Spark-shell帮我们干的事

    //读取文件生成RDD
    val fileRDD: RDD[String] = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt")
    //把每一行数据按照逗号,分隔
    val wordRDD: RDD[String] = fileRDD.flatMap(line => line.split(","))
    //让每一个单词都出现一次
    val wordOneRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1))
    //单词计数
    val wordCountRDD: RDD[(String, Int)] = wordOneRDD.reduceByKey(_ + _)
    //按照单词出现次数 降序排序 sortByKey()只能按照key排序,降序是false,true是升序
    val sortedRDD: RDD[(String, Int)] = wordCountRDD.sortBy(tuple => tuple._2, false)
    //打印结果
    sortedRDD.foreach(tuple => {
      println("单词" + tuple._1 + "出现的次数" + tuple._2)
    })

    /**
      * 单词计数:流式编程,函数式编程
      */
    /*sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2).foreach(tuple => {
      println(tuple._1 + " " + tuple._2)
    })*/

    sc.stop()
  }
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCount_java {
    /**
     * java-7
     * 没有lambda表达式
     * 单词计数程序
     */
    public static void main(String[] args) {
        //配置文件
        final SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("WordCount_java");
        //初始化程序入口
        final JavaSparkContext sc = new JavaSparkContext(conf);
        final JavaRDD fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt");
        //在java中我们需要传入FlatMapFunction类型,第一个是输入的数据类型,第二个参数是输出的数据类型,里面要实现Iterator抽象方法
        final JavaRDD wordRDD = fileRDD.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String line) throws Exception {
                //固定操作
                return Arrays.asList(line.split(",")).iterator();
            }
        });
        //只要是ByKey方法就要求RDD里面必须是Key-value键值对类型

        //下面注释的代码是错误的,我们正常的逻辑希望最后有一个reduceByKey()但是没有
        // map()方法,要什么类型就new什么
       /* final JavaRDD> wordOneRDD = wordRDD.map(new Function>() {
            @Override
            public HashMap call(String word) throws Exception {
                final HashMap map = new HashMap<>();
                map.put(word, 1);
                return null;
            }
        });*/

        //我们只要是做什么ByKey的操作需要转换成ToPair,sortByKey,groupByKey,reduceByKey
        final JavaPairRDD wordOneRDD = wordRDD.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });
        final JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey(new Function2() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });
        final JavaPairRDD count2wordRDD = wordCountRDD.mapToPair(new PairFunction, Integer, String>() {
            @Override
            public Tuple2 call(Tuple2 tuple) throws Exception {
                return new Tuple2(tuple._2, tuple._1);
            }
        });
        final JavaPairRDD sortedRDD = count2wordRDD.sortByKey(false);
        sortedRDD.foreach(new VoidFunction>() {
            @Override
            public void call(Tuple2 tuple) throws Exception {
                System.out.println("单词:" + tuple._2 + "次数:" + tuple._1);
            }
        });
        sc.stop();
    }
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;

public class WordCount_java8 {
    /**
     * JAVA-8开发单词计数程序
     */
    public static void main(String[] args) {
        //配置文件
        final SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("WordCount_java8");
        //初始化程序入口
        final JavaSparkContext sc = new JavaSparkContext(conf);
        final JavaRDD fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt");
        final JavaRDD wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
        final JavaPairRDD wordOneRDD = wordRDD.mapToPair(word -> new Tuple2(word, 1));
        final JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey((m, n) -> m + n);
        final JavaPairRDD count2wordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        final JavaPairRDD sortedRDD = count2wordRDD.sortByKey(false);
        sortedRDD.foreach(tuple->{
            System.out.println("单词:"+tuple._2+"次数:"+tuple._1);
        });
        sc.stop();

        //流式写法TopN  take表示取前几个
        final List> result = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(line -> Arrays.asList(line.split(",")).iterator()).mapToPair(word -> new Tuple2(word, 1)).reduceByKey((m, n) -> m + n).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1)).sortByKey(false).take(2);
        result.forEach(t->{
            System.out.println(t._2()+" "+t._1());
        });
    }
}