初识Java8中的Stream
lambda表达式是stream的基础,初学者建议先学习lambda表达式,
1.初识stream
先来一个总纲:
东西就是这么多啦,stream是java8中加入的一个非常实用的功能,最初看时以为是io中的流(其实一点关系都没有),让我们先来看一个小例子感受一下:
@before public void init() { random = new random(); stulist = new arraylist<student>() { { for (int i = 0; i < 100; i++) { add(new student("student" + i, random.nextint(50) + 50)); } } }; } public class student { private string name; private integer score; //-----getters and setters----- } //1列出班上超过85分的学生姓名,并按照分数降序输出用户名字 @test public void test1() { list<string> studentlist = stulist.stream() .filter(x->x.getscore()>85) .sorted(comparator.comparing(student::getscore).reversed()) .map(student::getname) .collect(collectors.tolist()); system.out.println(studentlist); }
列出班上分数超过85分的学生姓名,并按照分数降序输出用户名字,在java8之前我们需要三个步骤:
1)新建一个list<student> newlist,在for循环中遍历stulist,将分数超过85分的学生装入新的集合中
2)对于新的集合newlist进行排序操作
3)遍历打印newlist
这三个步骤在java8中只需要两条语句,如果紧紧需要打印,不需要保存新生产list的话实际上只需要一条,是不是非常方便。
2.stream的特性
我们首先列出stream的如下三点特性,在之后我们会对照着详细说明
1.stream不存储数据
2.stream不改变源数据
3.stream的延迟执行特性
通常我们在数组或集合的基础上创建stream,stream不会专门存储数据,对stream的操作也不会影响到创建它的数组和集合,对于stream的聚合、消费或收集操作只能进行一次,再次操作会报错,如下代码:
@test public void test1(){ stream<string> stream = stream.generate(()->"user").limit(20); stream.foreach(system.out::println); stream.foreach(system.out::println); }
程序在正常完成一次打印工作后报错。
stream的操作是延迟执行的,在列出班上超过85分的学生姓名例子中,在collect方法执行之前,filter、sorted、map方法还未执行,只有当collect方法执行时才会触发之前转换操作
看如下代码:
public boolean filter(student s) { system.out.println("begin compare"); return s.getscore() > 85; } @test public void test() { stream<student> stream = stream.of(stuarr).filter(this::filter); system.out.println("split-------------------------------------"); list<student> studentlist = stream.collect(tolist()); }
我们将filter中的逻辑抽象成方法,在方法中加入打印逻辑,如果stream的转换操作是延迟执行的,那么split会先打印,否则后打印,代码运行结果为
可见stream的操作是延迟执行的。
tip:
当我们操作一个流的时候,并不会修改流底层的集合(即使集合是线程安全的),如果想要修改原有的集合,就无法定义流操作的输出。
由于stream的延迟执行特性,在聚合操作执行前修改数据源是允许的。
list<string> wordlist; @before public void init() { wordlist = new arraylist<string>() { { add("a"); add("b"); add("c"); add("d"); add("e"); add("f"); add("g"); } }; } /** * 延迟执行特性,在聚合操作之前都可以添加相应元素 */ @test public void test() { stream<string> words = wordlist.stream(); wordlist.add("end"); long n = words.distinct().count(); system.out.println(n); }
最后打印的结果是8
如下代码是错误的
/** * 延迟执行特性,会产生干扰 * nullpointexception */ @test public void test2(){ stream<string> words1 = wordlist.stream(); words1.foreach(s -> { system.out.println("s->"+s); if (s.length() < 4) { system.out.println("select->"+s); wordlist.remove(s); system.out.println(wordlist); } }); }
结果报空指针异常
3.创建stream
1)通过数组创建
/** * 通过数组创建流 */ @test public void testarraystream(){ //1.通过arrays.stream //1.1基本类型 int[] arr = new int[]{1,2,34,5}; intstream intstream = arrays.stream(arr); //1.2引用类型 student[] studentarr = new student[]{new student("s1",29),new student("s2",27)}; stream<student> studentstream = arrays.stream(studentarr); //2.通过stream.of stream<integer> stream1 = stream.of(1,2,34,5,65); //注意生成的是int[]的流 stream<int[]> stream2 = stream.of(arr,arr); stream2.foreach(system.out::println); }
2)通过集合创建流
/** * 通过集合创建流 */ @test public void testcollectionstream(){ list<string> strs = arrays.aslist("11212","dfd","2323","dfhgf"); //创建普通流 stream<string> stream = strs.stream(); //创建并行流 stream<string> stream1 = strs.parallelstream(); }
3)创建空的流
@test public void testemptystream(){ //创建一个空的stream stream<integer> stream = stream.empty(); } 4)创建无限流 @test public void testunlimitstream(){ //创建无限流,通过limit提取指定大小 stream.generate(()->"number"+new random().nextint()).limit(100).foreach(system.out::println); stream.generate(()->new student("name",10)).limit(20).foreach(system.out::println); }
5)创建规律的无限流
/** * 产生规律的数据 */ @test public void testunlimitstream1(){ stream.iterate(0,x->x+1).limit(10).foreach(system.out::println); stream.iterate(0,x->x).limit(10).foreach(system.out::println); //stream.iterate(0,x->x).limit(10).foreach(system.out::println);与如下代码意思是一样的 stream.iterate(0, unaryoperator.identity()).limit(10).foreach(system.out::println); }
4.对stream的操作
1)最常使用
map:转换流,将一种类型的流转换为另外一种流
/** * map把一种类型的流转换为另外一种类型的流 * 将string数组中字母转换为大写 */ @test public void testmap() { string[] arr = new string[]{"yes", "yes", "no", "no"}; arrays.stream(arr).map(x -> x.tolowercase()).foreach(system.out::println); }
filter:过滤流,过滤流中的元素
@test public void testfilter(){ integer[] arr = new integer[]{1,2,3,4,5,6,7,8,9,10}; arrays.stream(arr).filter(x->x>3&&x<8).foreach(system.out::println); }
flapmap:拆解流,将流中每一个元素拆解成一个流
/** * flapmap:拆解流 */ @test public void testflapmap1() { string[] arr1 = {"a", "b", "c", "d"}; string[] arr2 = {"e", "f", "c", "d"}; string[] arr3 = {"h", "j", "c", "d"}; // stream.of(arr1, arr2, arr3).flatmap(x -> arrays.stream(x)).foreach(system.out::println); stream.of(arr1, arr2, arr3).flatmap(arrays::stream).foreach(system.out::println); }
sorted:对流进行排序
string[] arr1 = {"abc","a","bc","abcd"}; /** * comparator.comparing是一个键提取的功能 * 以下两个语句表示相同意义 */ @test public void testsorted1_(){ /** * 按照字符长度排序 */ arrays.stream(arr1).sorted((x,y)->{ if (x.length()>y.length()) return 1; else if (x.length()<y.length()) return -1; else return 0; }).foreach(system.out::println); arrays.stream(arr1).sorted(comparator.comparing(string::length)).foreach(system.out::println); } /** * 倒序 * reversed(),java8泛型推导的问题,所以如果comparing里面是非方法引用的lambda表达式就没办法直接使用reversed() * comparator.reverseorder():也是用于翻转顺序,用于比较对象(stream里面的类型必须是可比较的) * comparator. naturalorder():返回一个自然排序比较器,用于比较对象(stream里面的类型必须是可比较的) */ @test public void testsorted2_(){ arrays.stream(arr1).sorted(comparator.comparing(string::length).reversed()).foreach(system.out::println); arrays.stream(arr1).sorted(comparator.reverseorder()).foreach(system.out::println); arrays.stream(arr1).sorted(comparator.naturalorder()).foreach(system.out::println); } /** * thencomparing * 先按照首字母排序 * 之后按照string的长度排序 */ @test public void testsorted3_(){ arrays.stream(arr1).sorted(comparator.comparing(this::com1).thencomparing(string::length)).foreach(system.out::println); } public char com1(string x){ return x.charat(0); }
2)提取流和组合流
@before public void init(){ arr1 = new string[]{"a","b","c","d"}; arr2 = new string[]{"d","e","f","g"}; arr3 = new string[]{"i","j","k","l"}; } /** * limit,限制从流中获得前n个数据 */ @test public void testlimit(){ stream.iterate(1,x->x+2).limit(10).foreach(system.out::println); } /** * skip,跳过前n个数据 */ @test public void testskip(){ // stream.of(arr1).skip(2).limit(2).foreach(system.out::println); stream.iterate(1,x->x+2).skip(1).limit(5).foreach(system.out::println); } /** * 可以把两个stream合并成一个stream(合并的stream类型必须相同) * 只能两两合并 */ @test public void testconcat(){ stream<string> stream1 = stream.of(arr1); stream<string> stream2 = stream.of(arr2); stream.concat(stream1,stream2).distinct().foreach(system.out::println); }
3)聚合操作
@before public void init(){ arr = new string[]{"b","ab","abc","abcd","abcde"}; } /** * max、min * 最大最小值 */ @test public void testmaxandmin(){ stream.of(arr).max(comparator.comparing(string::length)).ifpresent(system.out::println); stream.of(arr).min(comparator.comparing(string::length)).ifpresent(system.out::println); } /** * count * 计算数量 */ @test public void testcount(){ long count = stream.of(arr).count(); system.out.println(count); } /** * findfirst * 查找第一个 */ @test public void testfindfirst(){ string str = stream.of(arr).parallel().filter(x->x.length()>3).findfirst().orelse("noghing"); system.out.println(str); } /** * findany * 找到所有匹配的元素 * 对并行流十分有效 * 只要在任何片段发现了第一个匹配元素就会结束整个运算 */ @test public void testfindany(){ optional<string> optional = stream.of(arr).parallel().filter(x->x.length()>3).findany(); optional.ifpresent(system.out::println); } /** * anymatch * 是否含有匹配元素 */ @test public void testanymatch(){ boolean aboolean = stream.of(arr).anymatch(x->x.startswith("a")); system.out.println(aboolean); } @test public void teststream1() { optional<integer> optional = stream.of(1,2,3).filter(x->x>1).reduce((x,y)->x+y); system.out.println(optional.get()); }
4)optional类型
通常聚合操作会返回一个optional类型,optional表示一个安全的指定结果类型,所谓的安全指的是避免直接调用返回类型的null值而造成空指针异常,调用optional.ifpresent()可以判断返回值是否为空,或者直接调用ifpresent(consumer<? super t> consumer)在结果部位空时进行消费操作;调用optional.get()获取返回值。通常的使用方式如下:
@test public void testoptional() { list<string> list = new arraylist<string>() { { add("user1"); add("user2"); } }; optional<string> opt = optional.of("andy with u"); opt.ifpresent(list::add); list.foreach(system.out::println); }
使用optional可以在没有值时指定一个返回值,例如
@test public void testoptional2() { integer[] arr = new integer[]{4,5,6,7,8,9}; integer result = stream.of(arr).filter(x->x>9).max(comparator.naturalorder()).orelse(-1); system.out.println(result); integer result1 = stream.of(arr).filter(x->x>9).max(comparator.naturalorder()).orelseget(()->-1); system.out.println(result1); integer result2 = stream.of(arr).filter(x->x>9).max(comparator.naturalorder()).orelsethrow(runtimeexception::new); system.out.println(result2); }
optional的创建
采用optional.empty()创建一个空的optional,使用optional.of()创建指定值的optional。同样也可以调用optional对象的map方法进行optional的转换,调用flatmap方法进行optional的迭代
@test public void teststream1() { optional<student> studentoptional = optional.of(new student("user1",21)); optional<string> optionalstr = studentoptional.map(student::getname); system.out.println(optionalstr.get()); } public static optional<double> inverse(double x) { return x == 0 ? optional.empty() : optional.of(1 / x); } public static optional<double> squareroot(double x) { return x < 0 ? optional.empty() : optional.of(math.sqrt(x)); } /** * optional的迭代 */ @test public void teststream2() { double x = 4d; optional<double> result1 = inverse(x).flatmap(streamtest7::squareroot); result1.ifpresent(system.out::println); optional<double> result2 = optional.of(4.0).flatmap(streamtest7::inverse).flatmap(streamtest7::squareroot); result2.ifpresent(system.out::println); }
5)收集结果
student[] students; @before public void init(){ students = new student[100]; for (int i=0;i<30;i++){ student student = new student("user",i); students[i] = student; } for (int i=30;i<60;i++){ student student = new student("user"+i,i); students[i] = student; } for (int i=60;i<100;i++){ student student = new student("user"+i,i); students[i] = student; } } @test public void testcollect1(){ /** * 生成list */ list<student> list = arrays.stream(students).collect(tolist()); list.foreach((x)-> system.out.println(x)); /** * 生成set */ set<student> set = arrays.stream(students).collect(toset()); set.foreach((x)-> system.out.println(x)); /** * 如果包含相同的key,则需要提供第三个参数,否则报错 */ map<string,integer> map = arrays.stream(students).collect(tomap(student::getname,student::getscore,(s,a)->s+a)); map.foreach((x,y)-> system.out.println(x+"->"+y)); } /** * 生成数组 */ @test public void testcollect2(){ student[] s = arrays.stream(students).toarray(student[]::new); for (int i=0;i<s.length;i++) system.out.println(s[i]); } /** * 指定生成的类型 */ @test public void testcollect3(){ hashset<student> s = arrays.stream(students).collect(tocollection(hashset::new)); s.foreach(system.out::println); } /** * 统计 */ @test public void testcollect4(){ intsummarystatistics summarystatistics = arrays.stream(students).collect(collectors.summarizingint(student::getscore)); system.out.println("getaverage->"+summarystatistics.getaverage()); system.out.println("getmax->"+summarystatistics.getmax()); system.out.println("getmin->"+summarystatistics.getmin()); system.out.println("getcount->"+summarystatistics.getcount()); system.out.println("getsum->"+summarystatistics.getsum()); }
6)分组和分片
分组和分片的意义是,将collect的结果集展示位map<key,val>的形式,通常的用法如下:
student[] students; @before public void init(){ students = new student[100]; for (int i=0;i<30;i++){ student student = new student("user1",i); students[i] = student; } for (int i=30;i<60;i++){ student student = new student("user2",i); students[i] = student; } for (int i=60;i<100;i++){ student student = new student("user3",i); students[i] = student; } } @test public void testgroupby1(){ map<string,list<student>> map = arrays.stream(students).collect(groupingby(student::getname)); map.foreach((x,y)-> system.out.println(x+"->"+y)); } /** * 如果只有两类,使用partitioningby会比groupingby更有效率 */ @test public void testpartitioningby(){ map<boolean,list<student>> map = arrays.stream(students).collect(partitioningby(x->x.getscore()>50)); map.foreach((x,y)-> system.out.println(x+"->"+y)); } /** * downstream指定类型 */ @test public void testgroupby2(){ map<string,set<student>> map = arrays.stream(students).collect(groupingby(student::getname,toset())); map.foreach((x,y)-> system.out.println(x+"->"+y)); } /** * downstream 聚合操作 */ @test public void testgroupby3(){ /** * counting */ map<string,long> map1 = arrays.stream(students).collect(groupingby(student::getname,counting())); map1.foreach((x,y)-> system.out.println(x+"->"+y)); /** * summingint */ map<string,integer> map2 = arrays.stream(students).collect(groupingby(student::getname,summingint(student::getscore))); map2.foreach((x,y)-> system.out.println(x+"->"+y)); /** * maxby */ map<string,optional<student>> map3 = arrays.stream(students).collect(groupingby(student::getname,maxby(comparator.comparing(student::getscore)))); map3.foreach((x,y)-> system.out.println(x+"->"+y)); /** * mapping */ map<string,set<integer>> map4 = arrays.stream(students).collect(groupingby(student::getname,mapping(student::getscore,toset()))); map4.foreach((x,y)-> system.out.println(x+"->"+y)); }
5.原始类型流
在数据量比较大的情况下,将基本数据类型(int,double...)包装成相应对象流的做法是低效的,因此,我们也可以直接将数据初始化为原始类型流,在原始类型流上的操作与对象流类似,我们只需要记住两点
1.原始类型流的初始化
2.原始类型流与流对象的转换
doublestream doublestream; intstream intstream; /** * 原始类型流的初始化 */ @before public void teststream1(){ doublestream = doublestream.of(0.1,0.2,0.3,0.8); intstream = intstream.of(1,3,5,7,9); intstream stream1 = intstream.rangeclosed(0,100); intstream stream2 = intstream.range(0,100); } /** * 流与原始类型流的转换 */ @test public void teststream2(){ stream<double> stream = doublestream.boxed(); doublestream = stream.maptodouble(double::new); }
6.并行流
可以将普通顺序执行的流转变为并行流,只需要调用顺序流的parallel() 方法即可,如stream.iterate(1, x -> x + 1).limit(10).parallel()。
1) 并行流的执行顺序
我们调用peek方法来瞧瞧并行流和串行流的执行顺序,peek方法顾名思义,就是偷窥流内的数据,peek方法声明为stream<t> peek(consumer<? super t> action);加入打印程序可以观察到通过流内数据,见如下代码:
public void peek1(int x) { system.out.println(thread.currentthread().getname() + ":->peek1->" + x); } public void peek2(int x) { system.out.println(thread.currentthread().getname() + ":->peek2->" + x); } public void peek3(int x) { system.out.println(thread.currentthread().getname() + ":->final result->" + x); } /** * peek,监控方法 * 串行流和并行流的执行顺序 */ @org.junit.test public void testpeek() { stream<integer> stream = stream.iterate(1, x -> x + 1).limit(10); stream.peek(this::peek1).filter(x -> x > 5) .peek(this::peek2).filter(x -> x < 8) .peek(this::peek3) .foreach(system.out::println); } @test public void testpeekpal() { stream<integer> stream = stream.iterate(1, x -> x + 1).limit(10).parallel(); stream.peek(this::peek1).filter(x -> x > 5) .peek(this::peek2).filter(x -> x < 8) .peek(this::peek3) .foreach(system.out::println); }
串行流打印结果如下:
并行流打印结果如下:
咋看不一定能看懂,我们用如下的图来解释
我们将stream.filter(x -> x > 5).filter(x -> x < 8).foreach(system.out::println)的过程想象成上图的管道,我们在管道上加入的peek相当于一个阀门,透过这个阀门查看流经的数据,
1)当我们使用顺序流时,数据按照源数据的顺序依次通过管道,当一个数据被filter过滤,或者经过整个管道而输出后,第二个数据才会开始重复这一过程
2)当我们使用并行流时,系统除了主线程外启动了七个线程(我的电脑是4核八线程)来执行处理任务,因此执行是无序的,但同一个线程内处理的数据是按顺序进行的。
2) sorted()、distinct()等对并行流的影响
sorted()、distinct()是元素相关方法,和整体的数据是有关系的,map,filter等方法和已经通过的元素是不相关的,不需要知道流里面有哪些元素 ,并行执行和sorted会不会产生冲突呢?
结论:1.并行流和排序是不冲突的,2.一个流是否是有序的,对于一些api可能会提高执行效率,对于另一些api可能会降低执行效率
3.如果想要输出的结果是有序的,对于并行的流需要使用foreachordered(foreach的输出效率更高)
我们做如下实验:
/** * 生成一亿条0-100之间的记录 */ @before public void init() { random random = new random(); list = stream.generate(() -> random.nextint(100)).limit(100000000).collect(tolist()); } /** * tip */ @org.junit.test public void test1() { long begin1 = system.currenttimemillis(); list.stream().filter(x->(x > 10)).filter(x->x<80).count(); long end1 = system.currenttimemillis(); system.out.println(end1-begin1); list.stream().parallel().filter(x->(x > 10)).filter(x->x<80).count(); long end2 = system.currenttimemillis(); system.out.println(end2-end1); long begin1_ = system.currenttimemillis(); list.stream().filter(x->(x > 10)).filter(x->x<80).distinct().sorted().count(); long end1_ = system.currenttimemillis(); system.out.println(end1-begin1); list.stream().parallel().filter(x->(x > 10)).filter(x->x<80).distinct().sorted().count(); long end2_ = system.currenttimemillis(); system.out.println(end2_-end1_); }
可见,对于串行流.distinct().sorted()方法对于运行时间没有影响,但是对于串行流,会使得运行时间大大增加,因此对于包含sorted、distinct()等与全局数据相关的操作,不推荐使用并行流。
7.stream vs spark rdd
最初看到stream的一个直观感受是和spark像,真的像
val count = sc.parallelize(1 to num_samples).filter { _ => val x = math.random val y = math.random x*x + y*y < 1}.count()println(s"pi is roughly ${4.0 * count / num_samples}")
以上代码摘自spark官网,使用的是scala语言,一个最基础的word count代码,这里我们简单介绍一下spark,spark是当今最流行的基于内存的大数据处理框架,spark中的一个核心概念是rdd(弹性分布式数据集),将分布于不同处理器上的数据抽象成rdd,rdd上支持两种类型的操作1) transformation(变换)2) action(行动),对于rdd的transformation算子并不会立即执行,只有当使用了action算子后,才会触发。
总结
以上所示是小编给大家介绍的java8中的stream相关知识,希望对大家有所帮助