Stream数据流的探讨
当我们遇到一个很大的数据文件时,会怎么处理呢?
我们会将其分成很多段,其中每一个都在不同的线程中处理,然后输出结果。
Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次。
Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。
Collection改进
1.forEach()输出支持: default void forEach(Consumer<? super T> action)
2.取得Stream数据流对象: default Stream stream
代码实现:
public static void main(String[] args) {
List<String> list=new ArrayList<>();
Collections.addAll(list,"Zhangsan","LisI","Wangwu","Like","zhangyi","want");
list.forEach(s-> System.out.println(s)); //lambda表达式
System.out.println("===>");
list.forEach(System.out::println); //方法引用
System.out.println("===>");
// System.out.println(list.size());
System.out.println(list.stream().count());
List<String> newlist=new ArrayList<>();
for (String item : list) {
if (item.contains("Li")) { //foreach循环
newlist.add(item);
}
}
System.out.println(newlist);
}
Stream操作:
1.count()方法:针对数据量做一个统计操作
2.filter()方法:进行数据的过滤
3.希望在数据过滤后得到具体数据,就可以使用收集器来完成。
收集器:public <R, A> R collect(Collector<? super T, A, R> collector)
收集完的数据依然属于List集合,所以可以直接使用List进行接收
在Stream接口中重点有两个操作方法:
- 设置取出最大内容: public Stream limit(long maxSize);
- 跳过的数据量: public Stream skip(long n);
Stream 支持 skip(n) 方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一 个空流。limit(n) 和 skip(n) 是互补的
Stream 支持 map 方法,它会接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映 射成一个新的元素 。
List list=list.skip(0).limit(2).map(s>s.toUpperCase()).collect(Collectors.toList());
代码实现:
import com.sun.org.apache.xpath.internal.SourceTree;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class TestStream {
public static void main(String[] args) {
List<String> list=new ArrayList<>();
Collections.addAll(list,"Zhangsan","LisI","Wangwu","Like","zhangyi","want");
list.forEach(s-> System.out.println(s));
System.out.println("===>");
list.forEach(System.out::println); //方法引用
System.out.println("===>");
// System.out.println(list.size());
System.out.println(list.stream().count());
List<String> newlist=new ArrayList<>();
for (String item : list) {
if (item.contains("Li")) { //foreach循环
newlist.add(item);
}
}
System.out.println(newlist);
System.out.println(list.stream().filter(s->s.contains("Li")).count()); //过滤出包含li的然后计算
//将字符串转换为小写,然后过滤出含有l的并且长度大于2 的字符串
System.out.println(list.stream().map(String::toLowerCase).filter(s -> s.contains("l")).filter(s -> s.length() > 2).collect(Collectors.toList()));
//跳跃两个值,取跳跃后的三个,并且包含i的
System.out.println(list.stream().skip(2).limit(3).filter(s->s.contains("i")).collect(Collectors.toList()));
}
}
运行结果:
parallelstream()方法:多核下,并行执行,可提高多线程任务的速度。
单核 cpu 环境,不推荐使用 parallel stream,在多核 cpu 且有大数据量的条件下,推荐使用 parallestream
MapReduce模型:
MapReduce是整个Stream的核心所在。MapReduce的操作也是由两个阶段所组成:
- map():指的是针对于数据进行先期的操作处理。例如:简单的数学运算等
- reduce():进行数据的统计分析
统计分析: public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
此时的方法返回的是一个DoubleStream接口对象,这里面就可以完成统计操作,这个统计使用的方法如下:
统计方法:DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
例:数据统计操作:
代码实现:
package Price;
public class Order {
private String title;
private double price;
private int amount;
public Order(String title, double price, int amount) {
this.title = title;
this.price = price;
this.amount = amount;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
}
package Price;
import java.util.ArrayList;
import java.util.DoubleSummaryStatistics;
import java.util.List;
public class MapReduce {
public static void main(String[] args) {
List<Order> oderlist = new ArrayList<>();
oderlist.add(new Order("Iphone", 8999.99, 10));
oderlist.add(new Order("外星人笔记本", 12999.99, 5));
oderlist.add(new Order("MacBookPro", 18999.99, 5));
oderlist.add(new Order("Java从入门到放弃.txt", 9.99, 20000));
oderlist.add(new Order("中性笔", 1.99, 200000));
double totalPrice=0.0D;
for(Order od:oderlist){
totalPrice+= od.getPrice()*od.getAmount();
}
System.out.println("总钱数为:"+totalPrice);
System.out.println("===>方法");
Double totalPrice1=oderlist.stream().map(order -> order.getAmount() * order.getPrice()).reduce((sum, x) -> sum + x).get(); //lambda表达式
Double totalPrice2 = oderlist.stream().map(order -> order.getAmount() * order.getPrice()).reduce(Double::sum).orElseGet(()->0.0D);//方法引用
Double totalPrice3 = oderlist.stream().mapToDouble(order -> order.getAmount() * order.getPrice()).reduce(Double::sum).orElseGet(()->0.0D);
System.out.println(totalPrice1);
System.out.println(totalPrice2);
System.out.println(totalPrice3);
System.out.println("====>");
DoubleSummaryStatistics statistics = oderlist.stream().mapToDouble(order -> order.getPrice() * order.getAmount()).summaryStatistics();
System.out.println("总订单数:" + statistics.getCount());
System.out.println("总金额:" + statistics.getSum());
System.out.println("最大金额:" + statistics.getMax());
System.out.println("最小金额:" + statistics.getMin());
System.out.println("平均金额:" + statistics.getAverage());
}
}
stream 的特点
1.只能遍历一次:
数据流的从一头获取数据源,在流水线上依次对元素进行操作,当元素通过流水线,便无法再对其进行操作,可以重新在数据源获取一个新的数据流进行操作;
2.采用内部迭代的方式:
对Collection进行处理,一般会使用 Iterator 遍历器的遍历方式,这是一种外部迭代;
而对于处理Stream,只要申明处理方式,处理过程由流对象自行完成,这是一种内部迭代,对于大量数据的迭代处理中,内部迭代比外部迭代要更加高效。