1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
1.11.Flink DataSetAPI
1.11.1.DataSet API之Data Sources
1.11.2.DataSet API之Transformations
1.11.3.DataSet Sink部分详解
1.11.Flink DataSetAPI
1.11.1.DataSet API之Data Sources
基于文件
readTextFile(path)
基于集合
fromCollection(Collection)
1.11.2.DataSet API之Transformations
Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作。
FlatMap:输入一个元素,可以返回零个,一个或者多个元素。
MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
/**
* Hash-Partition
*
* Range-Partition
*
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoHashRangePartition {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1,"hello1"));
data.add(new Tuple2<>(2,"hello2"));
data.add(new Tuple2<>(2,"hello3"));
data.add(new Tuple2<>(3,"hello4"));
data.add(new Tuple2<>(3,"hello5"));
data.add(new Tuple2<>(3,"hello6"));
data.add(new Tuple2<>(4,"hello7"));
data.add(new Tuple2<>(4,"hello8"));
data.add(new Tuple2<>(4,"hello9"));
data.add(new Tuple2<>(4,"hello10"));
data.add(new Tuple2<>(5,"hello11"));
data.add(new Tuple2<>(5,"hello12"));
data.add(new Tuple2<>(5,"hello13"));
data.add(new Tuple2<>(5,"hello14"));
data.add(new Tuple2<>(5,"hello15"));
data.add(new Tuple2<>(6,"hello16"));
data.add(new Tuple2<>(6,"hello17"));
data.add(new Tuple2<>(6,"hello18"));
data.add(new Tuple2<>(6,"hello19"));
data.add(new Tuple2<>(6,"hello20"));
data.add(new Tuple2<>(6,"hello21"));
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
/*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();
System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();*/
text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();
System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoHashRangePartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"hello1"))
data1.append((2,"hello2"))
data1.append((2,"hello3"))
data1.append((3,"hello4"))
data1.append((3,"hello5"))
data1.append((3,"hello6"))
data1.append((4,"hello7"))
data1.append((4,"hello8"))
data1.append((4,"hello9"))
data1.append((4,"hello10"))
data1.append((5,"hello11"))
data1.append((5,"hello12"))
data1.append((5,"hello13"))
data1.append((5,"hello14"))
data1.append((5,"hello15"))
data1.append((6,"hello16"))
data1.append((6,"hello17"))
data1.append((6,"hello18"))
data1.append((6,"hello19"))
data1.append((6,"hello20"))
data1.append((6,"hello21"))
val text = env.fromCollection(data1)
/*text.partitionByHash(0).mapPartition(it=>{
while (it.hasNext){
val tu = it.next()
println("当前线程id:"+Thread.currentThread().getId+","+tu)
}
it
}).print()*/
text.partitionByRange(0).mapPartition(it=>{
while (it.hasNext){
val tu = it.next()
println("当前线程id:"+Thread.currentThread().getId+","+tu)
}
it
}).print()
}
}
再例如:
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
/**
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoMapPartition {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);
/*text.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//获取数据库连接--注意,此时是每过来一条数据就获取一次链接
//处理数据
//关闭连接
return value;
}
});*/
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
//获取数据库连接--注意,此时是一个分区的数据获取一次连接【优点,每个分区获取一次链接】
//values中保存了一个分区的数据
//处理数据
Iterator<String> it = values.iterator();
while (it.hasNext()) {
String next = it.next();
String[] split = next.split("\\W+");
for (String word : split) {
out.collect(word);
}
}
//关闭链接
}
});
mapPartitionData.print();
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoMapPartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = ListBuffer[String]()
data.append("hello you")
data.append("hello me")
val text = env.fromCollection(data)
text.mapPartition(it=>{
//创建数据库连接,建议吧这块代码放到try-catch代码块中
val res = ListBuffer[String]()
while(it.hasNext){
val line = it.next()
val words = line.split("\\W+")
for(word <- words){
res.append(word)
}
}
res
//关闭连接
}).print()
}
}
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregation:sum、max、min等。
Distinct:返回一个数据集中去重之后的元素,data.distinct()
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
/**
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoDistinct {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);
FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.toLowerCase().split("\\W+");
for (String word : split) {
System.out.println("单词:"+word);
out.collect(word);
}
}
});
flatMapData.distinct()// 对数据进行整体去重
.print();
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoDistinctScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = ListBuffer[String]()
data.append("hello you")
data.append("hello me")
val text = env.fromCollection(data)
val flatMapData = text.flatMap(line=>{
val words = line.split("\\W+")
for(word <- words){
println("单词:"+word)
}
words
})
flatMapData.distinct().print()
}
}
Join:内连接
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
/**
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoJoin {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
//tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(3,"guangzhou"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
.equalTo(0)//指定第二个数据集中需要进行比较的元素角标
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("==================================");
//注意,这里用map和上面使用的with最终效果是一致的。
/*text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
.equalTo(0)//指定第二个数据集中需要进行比较的元素角标
.map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception {
return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);
}
}).print();*/
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoJoinScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww"))
val data2 = ListBuffer[Tuple2[Int,String]]()
data2.append((1,"beijing"))
data2.append((2,"shanghai"))
data2.append((3,"guangzhou"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.join(text2).where(0).equalTo(0).apply((first,second)=>{
(first._1,first._2,second._2)
}).print()
}
}
OuterJoin:外链接
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
/**
* 外连接
*
* 左外连接
* 右外连接
* 全外连接
*
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoOuterJoin {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
//tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(4,"guangzhou"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
/**
* 左外连接
*
* 注意:second这个tuple中的元素可能为null
*
*/
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(second==null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
System.out.println("=============================================================================");
/**
* 右外连接
*
* 注意:first这个tuple中的数据可能为null
*
*/
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("=============================================================================");
/**
* 全外连接
*
* 注意:first和second这两个tuple都有可能为null
*
*/
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}else if(second == null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoOuterJoinScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww"))
val data2 = ListBuffer[Tuple2[Int,String]]()
data2.append((1,"beijing"))
data2.append((2,"shanghai"))
data2.append((4,"guangzhou"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(first==null){
(second._1,"null",second._2)
}else{
(first._1,first._2,second._2)
}
}).print()
println("===============================")
text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
if(first==null){
(second._1,"null",second._2)
}else if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()
}
}
Cross:获取两个数据集的笛卡尔积
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import java.util.ArrayList;
/**
* 获取笛卡尔积
*
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoCross {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用户id,用户姓名>
ArrayList<String> data1 = new ArrayList<>();
data1.add("zs");
data1.add("ww");
//tuple2<用户id,用户所在城市>
ArrayList<Integer> data2 = new ArrayList<>();
data2.add(1);
data2.add(2);
DataSource<String> text1 = env.fromCollection(data1);
DataSource<Integer> text2 = env.fromCollection(data2);
CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);
cross.print();
/**
* 输出结果为:
* (zs,1)
* (zs,2)
* (ww,1)
* (ww,2)
*/
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* Created by xxxx on 2020/10/09 on 2018/10/30.
*/
object BatchDemoCrossScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = List("zs","ww")
val data2 = List(1,2)
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.cross(text2).print()
}
}
Union:返回两个数据集的总和,数据类型需要一致。
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
/**
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoUnion {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"lili"));
data2.add(new Tuple2<>(2,"jack"));
data2.add(new Tuple2<>(3,"jessic"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
UnionOperator<Tuple2<Integer, String>> union = text1.union(text2);
union.print();
}
}
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoUnionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data1 = ListBuffer[Tuple2[Int,String]]()
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww"))
val data2 = ListBuffer[Tuple2[Int,String]]()
data2.append((1,"jack"))
data2.append((2,"lili"))
data2.append((3,"jessic"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.union(text2).print()
}
}
First-n:获取集合中的前N个元素。
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
/**
* 获取集合中的前N个元素
* Created by xxxx on 2020/10/09 .
*/
public class BatchDemoFirstN {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(2,"zs"));
data.add(new Tuple2<>(4,"ls"));
data.add(new Tuple2<>(3,"ww"));
data.add(new Tuple2<>(1,"xw"));
data.add(new Tuple2<>(1,"aw"));
data.add(new Tuple2<>(1,"mw"));
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
//获取前3条数据,按照数据插入的顺序
text.first(3).print();
System.out.println("==============================");
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print();
System.out.println("==============================");
//根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素
text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
System.out.println("==============================");
//不分组,全局排序获取集合中的前3个元素,针对第一个元素升序,第二个元素倒序
text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();
}
}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* Created by xxxx on 2020/10/09
*/
object BatchDemoFirstNScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = ListBuffer[Tuple2[Int,String]]()
data.append((2,"zs"))
data.append((4,"ls"))
data.append((3,"ww"))
data.append((1,"xw"))
data.append((1,"aw"))
data.append((1,"mw"))
val text = env.fromCollection(data)
//获取前3条数据,按照数据插入的顺序
text.first(3).print()
println("==============================")
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print()
println("==============================")
//根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素
text.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()
println("==============================")
//不分组,全局排序获取集合中的前3个元素,
text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print()
}
}
Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序。
Rebalance:对数据集进行再平衡,重分区,消除数据倾斜。
Hash-Partition:根据指定key的哈希值对数据集进行分区(例如:partitionByHash())
Range-Partiton:根据指定的key对数据集进行范围分区(例如:partitionByRange())
Custom Partitioning:自定义分区规则
一、自定义分区需要实现Partitioner接口
二、partitionCustom(partitioner, “someKey”)
三、或者partitionCustom(partitioner, 0)
1.11.3.DataSet Sink部分详解
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。
上一篇: 循环队列及其相关操作
下一篇: 数据结构-栈和队列的多种实现方法