欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark开发-WordCount详细讲解

程序员文章站 2022-05-04 16:23:44
...

核心
详细讲解spark中wordcount的执行
环境
idea 2017.1.5
java 1.7
scala 2.10
spark 1.6

程序编写

package com.xlucas

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by xlucas on 2017/9/14.
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    /**
      第一步:创建spark的配置对象sparkconf,设置spark程序的运行时的配置信息,例如说通过setMaster来设置程序
      链接spark集群的master的URL,如果设置为local,则代表spark程序在本地运行,
     */
    val conf=new SparkConf();//创建SparkConf对象
    conf.setAppName("WordCount")//设置应用程序的名称,在程序运行的监控界面可以看到这个名字
    //conf.setMaster("local")//此时,程序在本地执行,不需要安装spark集群
    //conf.setMaster("spark://192.168.18.140:7077")//指定spark运行是集群模式 一般我们不在代码中指定,我们在提交的时候指定
    /*
    第二步:创建SparkContext对象,
    SparkContext是spark程序所有功能的唯一入口,无论是采用Scala,Java,Python,R等都必须有一个SparkContext
    SparkContext核心作用:初始化spark应用程序运行时候所需要的核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
    同时还会负责Spark程序往Master注册程序等
    SparkContext是整个spark应用程序中最为至关重要的一个对象
     */
    val sc=new SparkContext(conf)//创建SparkContext对象,通过传入SparkContext实例来定制Spark运行的具体参数和配置信息
    /*
    第3步:根据具体的数据来源 (HDFS,HBase,Local等)通过SparkContext来创建RDD
    RDD的创建有3种方式,外部的数据来源,根据scala集合,由其他的RDD操作
    数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
     */
    //val line=sc.textFile("E://data//LICENSE.txt",1) //读取本地的一个文件并且设置为1个partition
      val line =sc.textFile("hdfs://192.168.18.140:9000/input/LICENSE.txt") //指定HDFS的路径,这个也可以到时候在参数传入
    /*
    第4步:对初始的RDD进行Transformation级别的处理,例如Map、filter等高阶函数等的编程来进行具体的数据计算
     在对每一行的字符串拆分成单个单词
     在单词的拆分的基础上对每个单词实例计算为1,也就是word=>(word,1)
     在对每个单词实例计数为1基础上统计每个单词在文件中出现的总次数
     */
    val words=line.flatMap(_.split(" "))
    val pairs=words.map(word=>(word,1))
    val wordcounts=pairs.reduceByKey(_+_)
    //wordcounts.foreach(wordNum=>println(wordNum._1+":"+wordNum._2)) 本地模式用这个打印,
    wordcounts.collect().foreach(wordNum=>println(wordNum._1+":"+wordNum._2))
    sc.stop()
  }
}

在idea里面运行的结果, 这个是local模式
Spark开发-WordCount详细讲解

在spark集群运行的结果,这个集群模式
通过下面命令提交

./spark-submit --class com.xlucas.WordCount --master spark://192.168.18.140:7077 /root/data/ScalaSpark.jar 

Spark开发-WordCount详细讲解

运行过程遇到的错误和解决方法

17/09/15 23:08:22 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
    at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2214)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2214)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2214)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
    at com.xlucas.WordCount$.main(WordCount.scala:24)
    at com.xlucas.WordCount.main(WordCount.scala)

由于运行的spark是基于Hadoop编译的,这个需要依赖Hadoop的信息,如果是基于local模式,这个错误可以忽略了

Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
    at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
    at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
    at akka.actor.RootActorPath.$div(ActorPath.scala:185)
    at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:465)
    at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:124)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
	at scala.util.Try$.apply(Try.scala:191)
	at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
	at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    at scala.util.Success.flatMap(Try.scala:230)
    at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
    at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
    at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)

这个是Scala的版本不匹配的问题,我这边刚开始用的是Scala2.11后来修改成Scala2.10就可以了

相关标签: spark idea 编程