欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark流处理入门实践,初体验跟flink差不多

程序员文章站 2024-02-21 21:05:53
...

1.spark简介

弥补了hoodop流处理不足,性能优于hoodop,活跃度高于flink,提供了一站式大数据处理方案:

支持批处理(Spark Core)。Spark Core 是 Spark 的核心功能实现,包括:SparkContext 的初始化(DriverApplication 通过 SparkContext 提交)、部署模式、存储体系、任务提交与执行、计算引擎等。

支持交互式查询(Spark SQL)。Spark SQL 是 Spark 来操作结构化数据的程序包,可以让我们使用 SQL 语句的方式来查询数据,Spark 支持多种数据源,包含 Hive 表,parquet 以及 JSON等内容。

支持流式计算(Spark Streaming)。与 MapReduce 只能处理离线数据相比,Spark 还支持实时的流计算。Spark 依赖 Spark Streaming 对数据进行实时的处理。

支持机器学习(Spark MLlib)。提供机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的 API 接口大大降低了用户的学习成本。

支持图计算(Spark GraghX)。提供图计算处理能力,支持分布式, Pregel 提供的 API 可以解决图计算中的常见问题。

支持 Python 操作--PySpark

支持 R 语言--SparkR 
————————————————
原文链接:https://blog.csdn.net/qq_1018944104/article/details/85629580

2.单机版安装部署

官网http://spark.apache.org/downloads.html清华镜像下载,wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz  解压,启动: ./sbin/start-all.sh ,浏览器访问dashboard:http://192.168.203.132:8081(8080占用port自动修改为8081)

3.启动官方demo 及交互终端,查看dashboard

进入bin目录
方式1  ./run-example SparkPi 10
方式2:./spark-submit  --class org.apache.spark.examples.SparkPi --master spark://192.168.203.132:7077 --executor-memory 512m  --total-executor-cores 1 ../examples/jars/spark-examples_2.11-2.4.5.jar

spark-shell交互终端:可以直接使用sc 和spark两个对象

1.启动终端
./spark-shell  --master spark://192.168.203.132:7077 --executor-memory 512m 
--total-executor-cores 1 

2.修改日志级别
sc.setLogLevel(“info”)  

3.运行wordcount标准输出
sc.textFile("./test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(print)

4.运行wordcount输出到hdfs
sc.textFile("hdfs://myha01/spark/wc/input/words.txt").flatMap(_.split(" 
")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.203.132/spark/wc/output")

,控制台看不到word count结果,需要去dashboard看,可以看已完成及正在进行的spark-shell,再点进入每个job,即可看到job的

stdout ,跟flink玩法基本一致。

spark流处理入门实践,初体验跟flink差不多

 

spark流处理入门实践,初体验跟flink差不多

4.自定义spark job,套路完全雷同flink

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test-spark</groupId>
    <artifactId>test-spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <spark.version>2.4.5</spark.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

</project>

main方法定义job,api比flink丰富,且更像jdk8的api.如下为本地启动的wordcount,linux启动注意修改输入源,setMaster改为dashboard的“spark://192.168.203.132:7077”即可被监控。启动命令:

./spark-submit  --class test.SparkWordCount --master spark://192.168.203.132:7077 --executor-memory 512m     --total-executor-cores 1 ../examples/jars/spark.jar /root/test.txt /root/result

  最后两个参数分别为main方法的args[0] args[1] 。spark没有flink的dashboard那样的上传jar启动job操作

package test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.mortbay.util.ajax.JSON;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class SparkWordCount {
    public static void main(String[] args) {
        //输入文件与输出文件可以通过启动命令传入:args[0]
        String input = args.length >= 2 ? args[0] : "/root/test.txt";
        String output = args.length >= 2 ?args[1] : "/root/result";
        SparkConf conf = new SparkConf()
                .setMaster("local")  //linux设置为dashboard的master地址即可被监控,设置错也不会报错
                .setAppName(SparkWordCount.class.getSimpleName());
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //读取文件wordcount后输出
        List<Tuple2<Integer, String>> topK = sparkContext.textFile(input)
                .flatMap(str -> Arrays.asList(str.split("\n| ")).iterator())
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1)) 
                .reduceByKey((integer1, integer2) -> integer1 + integer2)
                .filter(tuple2 -> tuple2._1.length() > 0)
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._2, tuple2._1))  //单词与频数倒过来为新二元组,按频数倒排序取途topK
                .sortByKey(false)
                .take(10);
        for (Tuple2<Integer, String> tuple2 : topK) {
            System.out.println(JSON.toString(tuple2));
        };
        sparkContext.parallelize(topK).coalesce(1).saveAsTextFile(output);
        //关闭资源
        sparkContext.close();
    }
}

 

相关标签: bigdata