欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

执行Spark任务:客户端

程序员文章站 2022-11-15 08:43:00
执行Spark任务:客户端 1、Spark Submit工具:提交Spark的任务(jar文件) (*)spark提供的用于提交Spark任务工具 (...

执行Spark任务:客户端

1、Spark Submit工具:提交Spark的任务(jar文件)
        (*)spark提供的用于提交Spark任务工具
        (*)example:/root/training/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar
        (*)SparkPi.scala 例子:蒙特卡罗求PI

             bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100

             Pi is roughly 3.1419547141954713

             bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 300

            Pi is roughly 3.141877971395932

蒙特卡罗求PI

执行Spark任务:客户端

2、Spark Shell 工具:交互式命令行工具、作为一个Application运行
        两种模式:(1)本地模式
                        bin/spark-shell 
                        日志:Spark context available as 'sc' (master = local[*], app id = local-1518181597235).


                  (2)集群模式
                        bin/spark-shell --master spark://bigdata11:7077
                        日志:Spark context available as 'sc' (master = spark://bigdata11:7077, app id = app-20180209210815-0002).

            对象:Spark context available as 'sc'
                  Spark session available as 'spark' ---> 在Spark 2.0后,新提供
                                                          是一个统一的访问接口:Spark Core、Spark SQL、Spark Streaming


            sc.textFile("hdfs://bigdata11:9000/input/data.txt") 通过sc对象读取HDFS的文件
              .flatMap(_.split(" "))  分词操作、压平
              .map((_,1))  每个单词记一次数
              .reduceByKey(_+_)  按照key进行reduce,再将value进行累加
              .saveAsTextFile("hdfs://bigdata11:9000/output/spark/day0209/wc")

            多说一句:
            .reduceByKey(_+_)
            完整
            .reduceByKey((a,b) => a+b)

            Array((Tom,1),(Tom,2),(Mary,3),(Tom,6))

                  (Tom,(1,2,6))
                       1+2 = 3
                       3+6 = 9

3、开发WordCount程序
        http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.package
        (1)Scala版本: 在IDEA中
        (2)Java版本(比较麻烦) :在eclipse中
package mydemo
/*
提交
bin/spark-submit --master spark://bigdata11:7077 --class mydemo.MyWordCount /root/temp/MyWordCount.jar hdfs://bigdata11:9000/input/data.txt hdfs://bigdata11:9000/output/spark/day0209/wc1
 */

import org.apache.spark.{SparkConf, SparkContext}

//开发一个Scala版本的WordCount
object MyWordCount {
  def main(args: Array[String]): Unit = {
    //创建一个Config
    val conf = new SparkConf().setAppName("MyScalaWordCount")

    //核心创建SparkContext对象
    val sc = new SparkContext(conf)

    //使用sc对象执行相应的算子(函数)
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .saveAsTextFile(args(1))

    //停止SparkContext对象
    sc.stop()

  }
}
package demo;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/*
 * 执行
 * bin/spark-submit --master spark://bigdata11:7077 --class demo.JavaWordCount /root/temp/MyJavaWordCount.jar hdfs://bigdata11:9000/input/data.txt
 */
public class JavaWordCount {

    public static void main(String[] args) {
        //创建一个Config对象:配置参数
        SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");

        //创建一个SparkContext对象:JavaSparkContext
        JavaSparkContext context = new JavaSparkContext(conf);

        //读入数据
        JavaRDD lines = context.textFile(args[0]);

        //分词
        /*
         * FlatMapFunction 
         * String 读入的每一句话
         * U(String):返回值
         */
        JavaRDD words = lines.flatMap(new FlatMapFunction() {

            @Override
            public Iterator call(String line) throws Exception {
                //数据: I love Beijing
                // 如何进行分词操作
                return Arrays.asList(line.split(" ")).iterator();
            }

        });

        //每个单词记一次数
        //  Beijing  ---> (Beijing,1)
        /*
         * new PairFunction
         * String: 每个单词
         * K2, V2  ---> 相当于是Map的输出
         */
        JavaPairRDD wordOne = words.mapToPair(new PairFunction() {

            @Override
            public Tuple2 call(String word) throws Exception {
                // Beijing  ---> (Beijing,1)
                return new Tuple2(word, 1);
            }
        });

        //执行Reduce操作
        JavaPairRDD  count = wordOne.reduceByKey(new Function2() {

            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });

        //执行计算,执行action:把结果直接打印在屏幕上
        //           k4      v4
        List> result = count.collect();

        //输出到屏幕
        for(Tuple2 tuple: result){
            System.out.println(tuple._1+"\t"+tuple._2);
        }

        //停止SparkContext对象
        context.stop();
    }
}