欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CDH 开发运行Spark wordcount程序

程序员文章站 2022-06-14 14:59:18
...

翻译: https://www.cloudera.com/documentation/enterprise/latest/topics/spark_develop_run.html
版本: 5.14.2

本教程描述如何使用Spark支持的三种语言编写,编译和运行简单的Spark字数统计应用程序,支持的语言为Scala,Python和Java。在Scala和Java代码最初是由Sandy Ryza写的Cloudera教程开发。

继续阅读:

编写应用程序

示例应用程序是WordCount的增强版本,即规范的MapReduce示例。在这个版本的WordCount中,目标是学习语料库中最流行单词的分布。应用程序:

  1. 创建一个 SparkConfSparkContext 。一个Spark应用程序对应于一个SparkContext实例。
  2. 获取词频阈值。
  3. 读取输入的一组文本文档。
  4. 统计每个单词出现的次数。
  5. 过滤出现次数少于阈值的所有单词。
  6. 对于剩下的单词,计算每个字母出现的次数。

在MapReduce中,这需要两个MapReduce应用程序,以及在它们之间将中间数据保存到HDFS。在Spark中,此应用程序比使用MapReduce API开发的代码行少了约90%的代码行数。

这里有三个版本的程序:

Scala WordCount

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

    // get threshold
    val threshold = args(1).toInt

    // read in text file and split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

    // filter out words with fewer than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))
  }
}

Python的WordCount

import sys

from pyspark import SparkContext, SparkConf

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Spark Count")
  sc = SparkContext(conf=conf)

  # get threshold
  threshold = int(sys.argv[2])

  # read in text file and split each document into words
  tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))

  # count the occurrence of each word
  wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  # filter out words with fewer than threshold occurrences
  filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)

  # count characters
  charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  list = charCounts.collect()
  print repr(list)[1:-1]

Java 7 WordCount

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {

    // create Spark context with Spark configuration
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

    // get threshold
    final int threshold = Integer.parseInt(args[1]);

    // read in text file and split each document into words
    JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );

    // count the occurrence of each word
    JavaPairRDD<String, Integer> counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    // filter out words with fewer than threshold occurrences
    JavaPairRDD<String, Integer> filtered = counts.filter(
      new Function, Boolean>() {
        public Boolean call(Tuple2 tup) {
          return tup._2 >= threshold;
        }
      }
    );

    // count characters
    JavaPairRDD<Character, Integer> charCounts = filtered.flatMap(
      new FlatMapFunction<Tuple2<String, Integer>, Character>() {
        @Override
        public Iterable<Character> call(Tuple2<String, Integer> s) {
          Collection<Character> chars = new ArrayList<Character>(s._1().length());
          for (char c : s._1().toCharArray()) {
            chars.add(c);
          }
          return chars;
        }
      }
    ).mapToPair(
      new PairFunction<Character, Character, Integer>() {
        @Override
        public Tuple2<Character, Integer> call(Character c) {
          return new Tuple2<Character, Integer>(c, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    System.out.println(charCounts.collect());
  }
}

因为Java 7不支持匿名函数,所以这个Java程序比Scala和Python要冗长得多,但仍需要相当于MapReduce程序所需的一小部分代码。Java 8支持匿名函数,它们的使用可以进一步简化Java应用程序。

编译和打包Scala和Java应用程序

本教程使用Maven编译和打包Scala和Java程序。本教程的摘录 的pom.xml 包括在下面。有关使用Maven构建Spark应用程序的最佳实践,请参阅 构建Spark应用程序

要编译Scala,请包含Scala工具插件:

<plugin>
  <groupId>org.scala-tools</groupId>
      <xrefrtifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

这需要 scala-tools 插件 :

<pluginRepositories>
<pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

另外,包括Scala和Spark作为依赖项:

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <xrefrtifactId>scala-library</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <xrefrtifactId>spark-core_2.10</artifactId>
    <version>1.6.0-cdh5.7.0</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

要生成应用程序JAR,请运行:

$ mvn package

将在target目录生成 sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar 。

运行应用程序

  1. 应用程序的输入是一个大型的文本文件,其中每行包含文档中的所有单词,不含标点符号。将输入文件放在HDFS的目录中。您可以使用教程示例输入文件
$ wget --no-check-certificate .../inputfile.txt
$ hdfs dfs -put inputfile.txt
  1. 使用其中一个应用程序运行 spark-submit
    • Scala - 输入阈值2,在本地进程中运行:
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \
--master local --deploy-mode client --executor-memory 1g \
--name wordcount --conf "spark.app.id=wordcount" \
sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2

如果使用示例输入文件,则输出应该如下所示:

(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
  • Java - 阈值2 ,在本地进程中运行:
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount \
--master local --deploy-mode client --executor-memory 1g \
--name wordcount --conf "spark.app.id=wordcount" \
sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2

如果使用示例输入文件,则输出应该如下所示:

(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
  • Python - 阈值2, 在YARN上运行:
$ spark-submit --master yarn --deploy-mode client --executor-memory 1g \
--name wordcount --conf "spark.app.id=wordcount" wordcount.py hdfs://namenode_host:8020/path/to/inputfile.txt 2

在这种情况下,输出应该如下所示:

[(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]