Spark学习笔记
spark core
1.1 rdd
概念:the main abstraction spark provides is a resilient distributed dataset (rdd), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
rdd创建:
- parallelizing an existing collection in your driver program
- referencing a dataset in an external storage system, such as a shared filesystem
//第一种创建方法
list<integer> data = arrays.aslist(1, 2, 3, 4, 5);
javardd<integer> distdata = sc.parallelize(data);
//第二种创建方法
javardd<string> distfile = sc.textfile("data.txt");
rdd操作:
- transformations, which create a new dataset from an existing one
- actions, which return a value to the driver program after running a computation on the dataset
javardd<string> lines = sc.textfile("data.txt");
javardd<integer> linelengths = lines.map(s -> s.length());
int totallength = linelengths.reduce((a, b) -> a + b);
map is a transformation that passes each dataset element through a function and returns a new rdd representing the results. on the other hand, reduce is an action that aggregates all the elements of the rdd using some function and returns the final result to the driver program.
rdd操作性能:
all transformations in spark are lazy.
by default, each transformed rdd may be recomputed each time you run an action on it. however, you may also persist an rdd in memory using the persist (or cache) method, in which case spark will keep the elements around on the cluster for much faster access the next time you query it.
shuffle operations. the shuffle is spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. (reducebykey).
1.2 shared variables
broadcast variables:broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
broadcast<int[]> broadcastvar = sc.broadcast(new int[] {1, 2, 3});
broadcastvar.value();
// returns [1, 2, 3]
accumulators:accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.a numeric accumulator can be created by calling sparkcontext.longaccumulator() or sparkcontext.doubleaccumulator() to accumulate values of type long or double, respectively. tasks running on a cluster can then add to it using the add method. however, they cannot read its value. only the driver program can read the accumulator’s value, using its value method.sed to implement counters (as in mapreduce) or sums.
longaccumulator accum = jsc.sc().longaccumulator();
sc.parallelize(arrays.aslist(1, 2, 3, 4)).foreach(x -> accum.add(x));
accum.value();
// returns 10
spark sql
spark sql is a spark module for structured data processing. unlike the basic spark rdd api, the interfaces provided by spark sql provide spark with more information about the structure of both the data and the computation being performed. internally, spark sql uses this extra information to perform extra optimizations. there are several ways to interact with spark sql including sql and the dataset api.
a dataset is a distributed collection of data. dataset is a new interface added in spark 1.6 that provides the benefits of rdds (strong typing, ability to use powerful lambda functions) with the benefits of spark sql’s optimized execution engine.
a dataframe is a dataset organized into named columns. it is conceptually equivalent to a table in a relational database or a data frame in r/python, but with richer optimizations under the hood. while, in java api, users need to use dataset<row> to represent a dataframe.
//dataframes实例
//people.json
//{"name":"michael"}
//{"name":"andy", "age":30}
//{"name":"justin", "age":19}
import org.apache.spark.sql.sparksession;
import org.apache.spark.sql.dataset;
import org.apache.spark.sql.row;
dataset<row> df = spark.read().json("examples/src/main/resources/people.json");
sparksession spark = sparksession
.builder()
.appname("java spark sql basic example")
.config("spark.some.config.option", "some-value")
.getorcreate();
// displays the content of the dataframe to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|michael|
// | 30| andy|
// | 19| justin|
// +----+-------+
// select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |michael| null|
// | andy| 31|
// | justin| 20|
// +-------+---------+
// select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|andy|
// +---+----+
// count people by age
df.groupby("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
//sql实例
import org.apache.spark.sql.dataset;
import org.apache.spark.sql.row;
// register the dataframe as a sql temporary view
df.createorreplacetempview("people");
dataset<row> sqldf = spark.sql("select * from people");
sqldf.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|michael|
// | 30| andy|
// | 19| justin|
// +----+-------+
//dataset实例
import java.util.arrays;
import java.util.collections;
import java.io.serializable;
import org.apache.spark.api.java.function.mapfunction;
import org.apache.spark.sql.dataset;
import org.apache.spark.sql.row;
import org.apache.spark.sql.encoder;
import org.apache.spark.sql.encoders;
public static class person implements serializable {
private string name;
private int age;
public string getname() {
return name;
}
public void setname(string name) {
this.name = name;
}
public int getage() {
return age;
}
public void setage(int age) {
this.age = age;
}
}
// create an instance of a bean class
person person = new person();
person.setname("andy");
person.setage(32);
// encoders are created for java beans
encoder<person> personencoder = encoders.bean(person.class);
dataset<person> javabeands = spark.createdataset(
collections.singletonlist(person),
personencoder
);
javabeands.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|andy|
// +---+----+
// dataframes can be converted to a dataset by providing a class. mapping based on name
string path = "examples/src/main/resources/people.json";
dataset<person> peopleds = spark.read().json(path).as(personencoder);
peopleds.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|michael|
// | 30| andy|
// | 19| justin|
// +----+-------+
datasets are similar to rdds, however, instead of using java serialization or kryo they use a specialized encoder to serialize the objects for processing or transmitting over the network. while both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.