欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

sparkCore Api常用算子使用

程序员文章站 2024-03-12 17:13:08
...

package sparkjava;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 熟悉api
 */
public class TestSparkApi {
    //  API介绍
  /*  1.1 transform
    l  map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

    l  filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

    l  flatMap(func):和map差不多,但是flatMap生成的是多个结果

    l  mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

    l  mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

    l  sample(withReplacement,faction,seed):抽样

    l  union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

    l  distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

    l  groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist

    l  reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

    l  sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

1.2 action
    l  reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

    l  collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

    l  count():返回的是dataset中的element的个数

    l  first():返回的是dataset中的第一个元素

    l  take(n):返回前n个elements

    l  takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

    l  saveAsTextFile(path):把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

    l  saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

    l  countByKey():返回的是key对应的个数的一个map,作用于一个RDD

    l  foreach(func):对dataset中的每个元素都使用func*/
   /*数据情况
 a 1
b 2
c 3
d 4
e 5*/
    public static void main(String[] args) {
//        String filepath=args[0];
        SparkConf conf = new SparkConf();
        conf.setAppName("transformAndAction");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = sc.textFile("D://sparkAction.txt");

//        --transform

//        testSparkCoreApiMap(rdd); //本案例实现,打印所有数据
//        testSparkCoreApiFilter(rdd);
//        testSparkCoreApiFlatMap(rdd); //本案例实现:打印所有的字符

//        testSparkCoreApiUnion(rdd);//合并两个RDD
        // testSparkCoreApiDistinct(rdd);
//         testSparkCoreApiMaptoPair(rdd);//把RDD映射为键值对类型的数据
//        testSparkCoreApiGroupByKey(rdd);//对键值对类型的数据进行按键值合并
//        testSparkCoreApiReduceByKey(rdd);//对键值对进行按键相同的对值进行操作
//                --action
        testSparkCoreApiReduce(rdd);//对RDD进行递归调用
    }

    /**
     * Map主要是对数据进行处理,不进行数据集的增减
     * <p>
     * 本案例实现,打印所有数据
     *
     * @param rdd
     */

    private static void testSparkCoreApiMap(JavaRDD<String> rdd) {
        JavaRDD<String> logData1 = rdd.map(new Function<String, String>() {
            public String call(String s) {
                return s;
            }
        });
        List list = logData1.collect();

        list.forEach(x->System.out.println(x));
        /*for (int i = 0; i < list.size(); i++) {
            System.out.println(list.get(i));


        }*/
    }
    /*
     * filter主要是过滤数据的功能
     * 本案例实现:过滤含有a的那行数据
     */
    private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
        JavaRDD<String> logData1 = rdd.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return (s.split(" "))[0].equals("a");
            }
        });

//        JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
//            public Boolean call(String s){
//
//                return (s.split(" "))[0].equals("a");
//            }
//
//        });
        List list = logData1.collect();

        list.forEach(System.out::println);
//        for (int i = 0; i < list.size(); i++) {
//            System.out.println(list.get(i));
//
//
//        }

    }

    /*
     * flatMap  用户行转列
     * 本案例实现:打印所有的字符
     */
    private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
        JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
//        JavaRDD<String> words=rdd.flatMap(
//                new FlatMapFunction<String, String>() {
//                    public Iterable<String> call(String s) throws Exception {
//                        return Arrays.asList(s.split(" "));
//                    }
//                }
//        );
        List list = words.collect();
        list.forEach(x-> System.out.println(x));
//        for (int i = 0; i < list.size(); i++) {
//            System.out.println(list.get(i));
//        }

    }

    /**
     * testSparkCoreApiUnion
     * 合并两个RDD
     * @param rdd
     */
    private static void testSparkCoreApiUnion(JavaRDD<String> rdd){
        JavaRDD<String> unionRdd=rdd.union(rdd);
//        unionRdd.foreach(new VoidFunction<String>(){
//            public void call(String lines){
//                System.out.println(lines);
//            }
//        });

        unionRdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String lines) throws Exception {
                System.out.println(lines);
            }
        });
    }


    /**
     * testSparkCoreApiDistinct Test
     * 对RDD去重
     * @param rdd
     */
    private static void testSparkCoreApiDistinct(JavaRDD<String> rdd){
        JavaRDD<String> unionRdd=rdd.union(rdd).distinct();
        unionRdd.foreach(new VoidFunction<String>(){
            public void call(String lines){
                System.out.println(lines);
            }
        });
    }

    /**
     * testSparkCoreApiMaptoPair Test
     * 把RDD映射为键值对类型的数据
     * @param rdd
     */
    private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){

        JavaPairRDD<String,Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] st = s.split(" ");
                return new Tuple2(st[0],st[1]); //自动推导类型
            }
        });
//        JavaPairRDD<String, Integer> pairRDD=rdd.mapToPair(new PairFunction<String,String,Integer>(){
//            @Override
//            public Tuple2<String, Integer> call(String t) throws Exception {
//                String[] st=t.split(" ");
//                return new Tuple2(st[0], st[1]);
//            }
//
//        });
        pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._2()+"=="+t._1());

            }
        });
    }
    /**
     * testSparkCoreApiGroupByKey Test
     * 对键值对类型的数据进行按键值合并
     * @param rdd
     */

    private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd){
        JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                String[] st=t.split(" ");
                return new Tuple2(st[0], Integer.valueOf(st[1]));
            }

        });

        JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd).groupByKey().sortByKey();//合并后并按照大小排序
        pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
            @Override
            public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                Iterable<Integer> iter = t._2();
                for (Integer integer : iter) {
                    System.out.println(integer);
                }
            }
        });
    }

    /**
     * testSparkCoreApiReduceByKey
     * 对键值对进行按键相同的对值进行操作
     * @param rdd
     */
    private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){

        //转换成键值对
        JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                String[] st=t.split(" ");
                return new Tuple2(st[0], Integer.valueOf(st[1]));
            }

        });
        //相同键位 相加 并排序
        JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
                new Function2<Integer,Integer,Integer>(){
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1+v2;
                    }
                }
        ).sortByKey() ;

        //rdd输出
        pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._2());

            }
        });
    }


    /**
     * testSparkCoreApiReduce
     * 对RDD进行递归调用
     * @param rdd
     */
    private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
        //由于原数据是String,需要转为Integer才能进行reduce递归
        JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
            @Override
            public Integer call(String v1) throws Exception {
                // TODO Auto-generated method stub
                return Integer.valueOf(v1.split(" ")[1]);
            }
        });

        Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
            @Override
            public Integer call(Integer v1,Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(a);

    }

}


相关标签: sparkcore 算子