详解Java编写并运行spark应用程序的方法
我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同ip的用户访问的次数,从而通过geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
121.205.198.92 - - [21/feb/2014:00:00:07 +0800] "get /archives/417.html http/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "mozilla/5.0 (windows nt 5.1; rv:11.0) gecko/20100101 firefox/11.0" 121.205.198.92 - - [21/feb/2014:00:00:11 +0800] "post /wp-comments-post.php http/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "mozilla/5.0 (windows nt 5.1; rv:23.0) gecko/20100101 firefox/23.0" 121.205.198.92 - - [21/feb/2014:00:00:12 +0800] "get /archives/417.html/ http/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "mozilla/5.0 (windows nt 5.1; rv:11.0) gecko/20100101 firefox/11.0" 121.205.198.92 - - [21/feb/2014:00:00:12 +0800] "get /archives/417.html http/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "mozilla/5.0 (windows nt 5.1; rv:11.0) gecko/20100101 firefox/11.0" 121.205.241.229 - - [21/feb/2014:00:00:13 +0800] "get /archives/526.html http/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "mozilla/5.0 (windows nt 5.1; rv:11.0) gecko/20100101 firefox/11.0" 121.205.241.229 - - [21/feb/2014:00:00:15 +0800] "post /wp-comments-post.php http/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "mozilla/5.0 (windows nt 5.1; rv:23.0) gecko/20100101 firefox/23.0"
java实现spark应用程序(application)
我们实现的统计分析程序,有如下几个功能点:
从hdfs读取日志数据文件
将每行的第一个字段(ip地址)抽取出来
统计每个ip地址出现的次数
根据每个ip地址出现的次数进行一个降序排序
根据ip地址,调用geoip库获取ip所属国家
打印输出结果,每行的格式:[国家代码] ip地址 频率
下面,看我们使用java实现的统计分析应用程序代码,如下所示:
package org.shirdrn.spark.job; import java.io.file; import java.io.ioexception; import java.util.arrays; import java.util.collections; import java.util.comparator; import java.util.list; import java.util.regex.pattern; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.apache.spark.api.java.javapairrdd; import org.apache.spark.api.java.javardd; import org.apache.spark.api.java.javasparkcontext; import org.apache.spark.api.java.function.flatmapfunction; import org.apache.spark.api.java.function.function2; import org.apache.spark.api.java.function.pairfunction; import org.shirdrn.spark.job.maxmind.country; import org.shirdrn.spark.job.maxmind.lookupservice; import scala.serializable; import scala.tuple2; public class ipaddressstats implements serializable { private static final long serialversionuid = 8533489548835413763l; private static final log log = logfactory.getlog(ipaddressstats.class); private static final pattern space = pattern.compile(" "); private transient lookupservice lookupservice; private transient final string geoipfile; public ipaddressstats(string geoipfile) { this.geoipfile = geoipfile; try { // lookupservice: get country code from a ip address file file = new file(this.geoipfile); log.info("geoip file: " + file.getabsolutepath()); lookupservice = new advancedlookupservice(file, lookupservice.geoip_memory_cache); } catch (ioexception e) { throw new runtimeexception(e); } } @suppresswarnings("serial") public void stat(string[] args) { javasparkcontext ctx = new javasparkcontext(args[0], "ipaddressstats", system.getenv("spark_home"), javasparkcontext.jarofclass(ipaddressstats.class)); javardd<string> lines = ctx.textfile(args[1], 1); // splits and extracts ip address filed javardd<string> words = lines.flatmap(new flatmapfunction<string, string>() { @override public iterable<string> call(string s) { // 121.205.198.92 - - [21/feb/2014:00:00:07 +0800] "get /archives/417.html http/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "mozilla/5.0 (windows nt 5.1; rv:11.0) gecko/20100101 firefox/11.0" // ip address return arrays.aslist(space.split(s)[0]); } }); // map javapairrdd<string, integer> ones = words.map(new pairfunction<string, string, integer>() { @override public tuple2<string, integer> call(string s) { return new tuple2<string, integer>(s, 1); } }); // reduce javapairrdd<string, integer> counts = ones.reducebykey(new function2<integer, integer, integer>() { @override public integer call(integer i1, integer i2) { return i1 + i2; } }); list<tuple2<string, integer>> output = counts.collect(); // sort statistics result by value collections.sort(output, new comparator<tuple2<string, integer>>() { @override public int compare(tuple2<string, integer> t1, tuple2<string, integer> t2) { if(t1._2 < t2._2) { return 1; } else if(t1._2 > t2._2) { return -1; } return 0; } }); writeto(args, output); } private void writeto(string[] args, list<tuple2<string, integer>> output) { for (tuple2<?, ?> tuple : output) { country country = lookupservice.getcountry((string) tuple._1); log.info("[" + country.getcode() + "] " + tuple._1 + "\t" + tuple._2); } } public static void main(string[] args) { // ./bin/run-my-java-example org.shirdrn.spark.job.ipaddressstats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/geoip_database.dat if (args.length < 3) { system.err.println("usage: ipaddressstats <master> <infile> <geoipfile>"); system.err.println(" example: org.shirdrn.spark.job.ipaddressstats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/geoip_database.dat"); system.exit(1); } string geoipfile = args[2]; ipaddressstats stats = new ipaddressstats(geoipfile); stats.stat(args); system.exit(0); } }
具体实现逻辑,可以参考代码中的注释。我们使用maven管理构建java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
<dependencies> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-core_2.10</artifactid> <version>0.9.0-incubating</version> </dependency> <dependency> <groupid>log4j</groupid> <artifactid>log4j</artifactid> <version>1.2.16</version> </dependency> <dependency> <groupid>dnsjava</groupid> <artifactid>dnsjava</artifactid> <version>2.1.1</version> </dependency> <dependency> <groupid>commons-net</groupid> <artifactid>commons-net</artifactid> <version>3.1</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>1.2.1</version> </dependency> </dependencies>
需要说明的是,当我们将程序在spark集群上运行时,它要求我们的编写的job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupservice没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:
14/03/10 22:34:06 info scheduler.dagscheduler: failed to run collect at ipaddressstats.java:76 exception in thread "main" org.apache.spark.sparkexception: job aborted: task not serializable: java.io.notserializableexception: org.shirdrn.spark.job.ipaddressstats at org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1028) at org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1026) at scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) at scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) at org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$abortstage(dagscheduler.scala:1026) at org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitmissingtasks(dagscheduler.scala:794) at org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitstage(dagscheduler.scala:737) at org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$submitstage$4.apply(dagscheduler.scala:741) at org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$submitstage$4.apply(dagscheduler.scala:740) at scala.collection.immutable.list.foreach(list.scala:318) at org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitstage(dagscheduler.scala:740) at org.apache.spark.scheduler.dagscheduler.processevent(dagscheduler.scala:569) at org.apache.spark.scheduler.dagscheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyorelse(dagscheduler.scala:207) at akka.actor.actorcell.receivemessage(actorcell.scala:498) at akka.actor.actorcell.invoke(actorcell.scala:456) at akka.dispatch.mailbox.processmailbox(mailbox.scala:237) at akka.dispatch.mailbox.run(mailbox.scala:219) at akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:386) at scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) at scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) at scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) at scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107)
在spark集群上运行java程序
这里,我使用了maven管理构建java程序,实现上述代码以后,使用maven的maven-assembly-plugin插件,配置内容如下所示:
<plugin> <artifactid>maven-assembly-plugin</artifactid> <configuration> <archive> <manifest> <mainclass>org.shirdrn.spark.job.useragentstats</mainclass> </manifest> </archive> <descriptorrefs> <descriptorref>jar-with-dependencies</descriptorref> </descriptorrefs> <excludes> <exclude>*.properties</exclude> <exclude>*.xml</exclude> </excludes> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
将相关依赖库文件都打进程序包里面,最后拷贝jar文件到linux系统下(不一定非要在spark集群的master节点上),保证该节点上spark的环境变量配置正确即可看。spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的java程序包(修改变量examples_dir以及我们的jar文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
cygwin=false case "`uname`" in cygwin*) cygwin=true;; esac scala_version=2.10 # figure out where the scala framework is installed fwdir="$(cd `dirname $0`/..; pwd)" # export this as spark_home export spark_home="$fwdir" # load environment variables from conf/spark-env.sh, if it exists if [ -e "$fwdir/conf/spark-env.sh" ] ; then . $fwdir/conf/spark-env.sh fi if [ -z "$1" ]; then echo "usage: run-example <example-class> [<args>]" >&2 exit 1 fi # figure out the jar file that our examples were packaged into. this includes a bit of a hack # to avoid the -sources and -doc packages that are built by publish-local. examples_dir="$fwdir"/java-examples spark_examples_jar="" if [ -e "$examples_dir"/*.jar ]; then export spark_examples_jar=`ls "$examples_dir"/*.jar` fi if [[ -z $spark_examples_jar ]]; then echo "failed to find spark examples assembly in $fwdir/examples/target" >&2 echo "you need to build spark with sbt/sbt assembly before running this program" >&2 exit 1 fi # since the examples jar ideally shouldn't include spark-core (that dependency should be # "provided"), also add our standard spark classpath, built using compute-classpath.sh. classpath=`$fwdir/bin/compute-classpath.sh` classpath="$spark_examples_jar:$classpath" if $cygwin; then classpath=`cygpath -wp $classpath` export spark_examples_jar=`cygpath -w $spark_examples_jar` fi # find java binary if [ -n "${java_home}" ]; then runner="${java_home}/bin/java" else if [ `command -v java` ]; then runner="java" else echo "java_home is not set" >&2 exit 1 fi fi # set java_opts to be able to load native libraries and to set heap size java_opts="$spark_java_opts" java_opts="$java_opts -djava.library.path=$spark_library_path" # load extra java_opts from conf/java-opts, if it exists if [ -e "$fwdir/conf/java-opts" ] ; then java_opts="$java_opts `cat $fwdir/conf/java-opts`" fi export java_opts if [ "$spark_print_launch_command" == "1" ]; then echo -n "spark command: " echo "$runner" -cp "$classpath" $java_opts "$@" echo "========================================" echo fi exec "$runner" -cp "$classpath" $java_opts "$@"
在spark上运行我们开发的java程序,执行如下命令:
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 ./bin/run-my-java-example org.shirdrn.spark.job.ipaddressstats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/geoip_database.dat
我实现的程序类org.shirdrn.spark.job.ipaddressstats运行需要3个参数:
spark集群主节点url:例如我的是spark://m1:7077
输入文件路径:业务相关的,我这里是从hdfs上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
geoip库文件:业务相关的,用来计算ip地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
14/03/10 22:17:24 info job.ipaddressstats: geoip file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/geoip_database.dat slf4j: class path contains multiple slf4j bindings. slf4j: found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-snapshot-jar-with-dependencies.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/staticloggerbinder.class] slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation. slf4j: actual binding is of type [org.slf4j.impl.log4jloggerfactory] 14/03/10 22:17:25 info slf4j.slf4jlogger: slf4jlogger started 14/03/10 22:17:25 info remoting: starting remoting 14/03/10 22:17:25 info remoting: remoting started; listening on addresses :[akka.tcp://spark@m1:57379] 14/03/10 22:17:25 info remoting: remoting now listens on addresses: [akka.tcp://spark@m1:57379] 14/03/10 22:17:25 info spark.sparkenv: registering blockmanagermaster 14/03/10 22:17:25 info storage.diskblockmanager: created local directory at /tmp/spark-local-20140310221725-c1cb 14/03/10 22:17:25 info storage.memorystore: memorystore started with capacity 143.8 mb. 14/03/10 22:17:25 info network.connectionmanager: bound socket to port 45189 with id = connectionmanagerid(m1,45189) 14/03/10 22:17:25 info storage.blockmanagermaster: trying to register blockmanager 14/03/10 22:17:25 info storage.blockmanagermasteractor$blockmanagerinfo: registering block manager m1:45189 with 143.8 mb ram 14/03/10 22:17:25 info storage.blockmanagermaster: registered blockmanager 14/03/10 22:17:25 info spark.httpserver: starting http server 14/03/10 22:17:25 info server.server: jetty-7.x.y-snapshot 14/03/10 22:17:25 info server.abstractconnector: started socketconnector@0.0.0.0:49186 14/03/10 22:17:25 info broadcast.httpbroadcast: broadcast server started at http://10.95.3.56:49186 14/03/10 22:17:25 info spark.sparkenv: registering mapoutputtracker 14/03/10 22:17:25 info spark.httpfileserver: http file server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370 14/03/10 22:17:25 info spark.httpserver: starting http server 14/03/10 22:17:25 info server.server: jetty-7.x.y-snapshot 14/03/10 22:17:25 info server.abstractconnector: started socketconnector@0.0.0.0:52073 14/03/10 22:17:26 info server.server: jetty-7.x.y-snapshot 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/storage/rdd,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/storage,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/stages/stage,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/stages/pool,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/stages,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/environment,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/executors,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/metrics/json,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/static,null} 14/03/10 22:17:26 info handler.contexthandler: started o.e.j.s.h.contexthandler{/,null} 14/03/10 22:17:26 info server.abstractconnector: started selectchannelconnector@0.0.0.0:4040 14/03/10 22:17:26 info ui.sparkui: started spark web ui at http://m1:4040 14/03/10 22:17:26 info spark.sparkcontext: added jar /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-snapshot-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-snapshot-jar-with-dependencies.jar with timestamp 1394515046396 14/03/10 22:17:26 info client.appclient$clientactor: connecting to master spark://m1:7077... 14/03/10 22:17:26 info storage.memorystore: ensurefreespace(60341) called with curmem=0, maxmem=150837657 14/03/10 22:17:26 info storage.memorystore: block broadcast_0 stored as values to memory (estimated size 58.9 kb, free 143.8 mb) 14/03/10 22:17:26 info cluster.sparkdeployschedulerbackend: connected to spark cluster with app id app-20140310221726-0000 14/03/10 22:17:27 info client.appclient$clientactor: executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores 14/03/10 22:17:27 info cluster.sparkdeployschedulerbackend: granted executor id app-20140310221726-0000/0 on hostport s1:52544 with 1 cores, 512.0 mb ram 14/03/10 22:17:27 warn util.nativecodeloader: unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/03/10 22:17:27 warn snappy.loadsnappy: snappy native library not loaded 14/03/10 22:17:27 info client.appclient$clientactor: executor updated: app-20140310221726-0000/0 is now running 14/03/10 22:17:27 info mapred.fileinputformat: total input paths to process : 1 14/03/10 22:17:27 info spark.sparkcontext: starting job: collect at ipaddressstats.java:77 14/03/10 22:17:27 info scheduler.dagscheduler: registering rdd 4 (reducebykey at ipaddressstats.java:70) 14/03/10 22:17:27 info scheduler.dagscheduler: got job 0 (collect at ipaddressstats.java:77) with 1 output partitions (allowlocal=false) 14/03/10 22:17:27 info scheduler.dagscheduler: final stage: stage 0 (collect at ipaddressstats.java:77) 14/03/10 22:17:27 info scheduler.dagscheduler: parents of final stage: list(stage 1) 14/03/10 22:17:27 info scheduler.dagscheduler: missing parents: list(stage 1) 14/03/10 22:17:27 info scheduler.dagscheduler: submitting stage 1 (mappartitionsrdd[4] at reducebykey at ipaddressstats.java:70), which has no missing parents 14/03/10 22:17:27 info scheduler.dagscheduler: submitting 1 missing tasks from stage 1 (mappartitionsrdd[4] at reducebykey at ipaddressstats.java:70) 14/03/10 22:17:27 info scheduler.taskschedulerimpl: adding task set 1.0 with 1 tasks 14/03/10 22:17:28 info cluster.sparkdeployschedulerbackend: registered executor: actor[akka.tcp://sparkexecutor@s1:59233/user/executor#-671170811] with id 0 14/03/10 22:17:28 info scheduler.tasksetmanager: starting task 1.0:0 as tid 0 on executor 0: s1 (process_local) 14/03/10 22:17:28 info scheduler.tasksetmanager: serialized task 1.0:0 as 2396 bytes in 5 ms 14/03/10 22:17:29 info storage.blockmanagermasteractor$blockmanagerinfo: registering block manager s1:47282 with 297.0 mb ram 14/03/10 22:17:32 info scheduler.tasksetmanager: finished tid 0 in 3376 ms on s1 (progress: 0/1) 14/03/10 22:17:32 info scheduler.dagscheduler: completed shufflemaptask(1, 0) 14/03/10 22:17:32 info scheduler.dagscheduler: stage 1 (reducebykey at ipaddressstats.java:70) finished in 4.420 s 14/03/10 22:17:32 info scheduler.dagscheduler: looking for newly runnable stages 14/03/10 22:17:32 info scheduler.dagscheduler: running: set() 14/03/10 22:17:32 info scheduler.dagscheduler: waiting: set(stage 0) 14/03/10 22:17:32 info scheduler.dagscheduler: failed: set() 14/03/10 22:17:32 info scheduler.taskschedulerimpl: remove taskset 1.0 from pool 14/03/10 22:17:32 info scheduler.dagscheduler: missing parents for stage 0: list() 14/03/10 22:17:32 info scheduler.dagscheduler: submitting stage 0 (mappartitionsrdd[6] at reducebykey at ipaddressstats.java:70), which is now runnable 14/03/10 22:17:32 info scheduler.dagscheduler: submitting 1 missing tasks from stage 0 (mappartitionsrdd[6] at reducebykey at ipaddressstats.java:70) 14/03/10 22:17:32 info scheduler.taskschedulerimpl: adding task set 0.0 with 1 tasks 14/03/10 22:17:32 info scheduler.tasksetmanager: starting task 0.0:0 as tid 1 on executor 0: s1 (process_local) 14/03/10 22:17:32 info scheduler.tasksetmanager: serialized task 0.0:0 as 2255 bytes in 1 ms 14/03/10 22:17:32 info spark.mapoutputtrackermasteractor: asked to send map output locations for shuffle 0 to spark@s1:33534 14/03/10 22:17:32 info spark.mapoutputtrackermaster: size of output statuses for shuffle 0 is 120 bytes 14/03/10 22:17:32 info scheduler.tasksetmanager: finished tid 1 in 282 ms on s1 (progress: 0/1) 14/03/10 22:17:32 info scheduler.dagscheduler: completed resulttask(0, 0) 14/03/10 22:17:32 info scheduler.dagscheduler: stage 0 (collect at ipaddressstats.java:77) finished in 0.314 s 14/03/10 22:17:32 info scheduler.taskschedulerimpl: remove taskset 0.0 from pool 14/03/10 22:17:32 info spark.sparkcontext: job finished: collect at ipaddressstats.java:77, took 4.870958309 s 14/03/10 22:17:32 info job.ipaddressstats: [cn] 58.246.49.218 312 14/03/10 22:17:32 info job.ipaddressstats: [kr] 1.234.83.77 300 14/03/10 22:17:32 info job.ipaddressstats: [cn] 120.43.11.16 212 14/03/10 22:17:32 info job.ipaddressstats: [cn] 110.85.72.254 207 14/03/10 22:17:32 info job.ipaddressstats: [cn] 27.150.229.134 185 14/03/10 22:17:32 info job.ipaddressstats: [hk] 180.178.52.181 181 14/03/10 22:17:32 info job.ipaddressstats: [cn] 120.37.210.212 180 14/03/10 22:17:32 info job.ipaddressstats: [cn] 222.77.226.83 176 14/03/10 22:17:32 info job.ipaddressstats: [cn] 120.43.11.205 169 14/03/10 22:17:32 info job.ipaddressstats: [cn] 120.43.9.19 165 ...
我们也可以通过web控制台来查看当前执行应用程序(application)的状态信息,通过master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(application)状态信息。
另外,需要说明的时候,如果在unix环境下使用eclipse使用java开发spark应用程序,也能够直接通过eclipse连接spark集群,并提交开发的应用程序,然后交给集群去处理。
总结
以上就是本文关于详解java编写并运行spark应用程序的方法的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家。