欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Stream数据流的探讨

程序员文章站 2022-07-14 21:43:24
...

当我们遇到一个很大的数据文件时,会怎么处理呢?
我们会将其分成很多段,其中每一个都在不同的线程中处理,然后输出结果。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次。
Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。
Collection改进
1.forEach()输出支持: default void forEach(Consumer<? super T> action)
2.取得Stream数据流对象: default Stream stream
代码实现:

 public static void main(String[] args) {
        List<String> list=new ArrayList<>();
        Collections.addAll(list,"Zhangsan","LisI","Wangwu","Like","zhangyi","want");
        list.forEach(s-> System.out.println(s));  //lambda表达式
        System.out.println("===>");
        list.forEach(System.out::println);  //方法引用
        System.out.println("===>");
       // System.out.println(list.size());
    System.out.println(list.stream().count());
    List<String> newlist=new ArrayList<>();
        for (String item : list) {
            if (item.contains("Li")) {    //foreach循环
                newlist.add(item);
            }
        }
        System.out.println(newlist);
        }

Stream操作:
Stream数据流的探讨

1.count()方法:针对数据量做一个统计操作
2.filter()方法:进行数据的过滤
3.希望在数据过滤后得到具体数据,就可以使用收集器来完成。
收集器:public <R, A> R collect(Collector<? super T, A, R> collector)
收集完的数据依然属于List集合,所以可以直接使用List进行接收
在Stream接口中重点有两个操作方法:

  1. 设置取出最大内容: public Stream limit(long maxSize);
  2. 跳过的数据量: public Stream skip(long n);
    Stream 支持 skip(n) 方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一 个空流。limit(n) 和 skip(n) 是互补的
    Stream 支持 map 方法,它会接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映 射成一个新的元素 。
    List list=list.skip(0).limit(2).map(s>s.toUpperCase()).collect(Collectors.toList());
    代码实现:
import com.sun.org.apache.xpath.internal.SourceTree;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;

public class TestStream {
    public static void main(String[] args) {
        List<String> list=new ArrayList<>();
        Collections.addAll(list,"Zhangsan","LisI","Wangwu","Like","zhangyi","want");
        list.forEach(s-> System.out.println(s));
        System.out.println("===>");
        list.forEach(System.out::println);  //方法引用
        System.out.println("===>");
       // System.out.println(list.size());
    System.out.println(list.stream().count());
    List<String> newlist=new ArrayList<>();
        for (String item : list) {
            if (item.contains("Li")) {    //foreach循环
                newlist.add(item);
            }
        }
        System.out.println(newlist);
        System.out.println(list.stream().filter(s->s.contains("Li")).count());  //过滤出包含li的然后计算
//将字符串转换为小写,然后过滤出含有l的并且长度大于2 的字符串
        System.out.println(list.stream().map(String::toLowerCase).filter(s -> s.contains("l")).filter(s -> s.length() > 2).collect(Collectors.toList()));
        //跳跃两个值,取跳跃后的三个,并且包含i的
        System.out.println(list.stream().skip(2).limit(3).filter(s->s.contains("i")).collect(Collectors.toList()));
    }
}

运行结果:
Stream数据流的探讨
parallelstream()方法:多核下,并行执行,可提高多线程任务的速度。
单核 cpu 环境,不推荐使用 parallel stream,在多核 cpu 且有大数据量的条件下,推荐使用 parallestream

MapReduce模型:
MapReduce是整个Stream的核心所在。MapReduce的操作也是由两个阶段所组成:

  1. map():指的是针对于数据进行先期的操作处理。例如:简单的数学运算等
  2. reduce():进行数据的统计分析
    统计分析: public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
    此时的方法返回的是一个DoubleStream接口对象,这里面就可以完成统计操作,这个统计使用的方法如下:
    统计方法:DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
    例:数据统计操作:
    代码实现:
package Price;

public class Order {
        private String title;
        private double price;
        private int amount;

        public Order(String title, double price, int amount) {
            this.title = title;
            this.price = price;
            this.amount = amount;
        }

        public String getTitle() {
            return title;
        }

        public void setTitle(String title) {
            this.title = title;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }

        public int getAmount() {
            return amount;
        }

        public void setAmount(int amount) {
            this.amount = amount;
        }
    }



package Price;

import java.util.ArrayList;
import java.util.DoubleSummaryStatistics;
import java.util.List;

public class MapReduce {
    public static void main(String[] args) {
        List<Order> oderlist = new ArrayList<>();
        oderlist.add(new Order("Iphone", 8999.99, 10));
        oderlist.add(new Order("外星人笔记本", 12999.99, 5));
        oderlist.add(new Order("MacBookPro", 18999.99, 5));
        oderlist.add(new Order("Java从入门到放弃.txt", 9.99, 20000));
        oderlist.add(new Order("中性笔", 1.99, 200000));
         double totalPrice=0.0D;
         for(Order od:oderlist){
             totalPrice+= od.getPrice()*od.getAmount();
         }
        System.out.println("总钱数为:"+totalPrice);
        System.out.println("===>方法");
        Double totalPrice1=oderlist.stream().map(order -> order.getAmount() * order.getPrice()).reduce((sum, x) -> sum + x).get();      //lambda表达式
        Double totalPrice2 = oderlist.stream().map(order -> order.getAmount() * order.getPrice()).reduce(Double::sum).orElseGet(()->0.0D);//方法引用
        Double totalPrice3 = oderlist.stream().mapToDouble(order -> order.getAmount() * order.getPrice()).reduce(Double::sum).orElseGet(()->0.0D);
        System.out.println(totalPrice1);
        System.out.println(totalPrice2);
        System.out.println(totalPrice3);
        System.out.println("====>");
        DoubleSummaryStatistics statistics = oderlist.stream().mapToDouble(order -> order.getPrice() * order.getAmount()).summaryStatistics();
        System.out.println("总订单数:" + statistics.getCount());
        System.out.println("总金额:" + statistics.getSum());
        System.out.println("最大金额:" + statistics.getMax());
        System.out.println("最小金额:" + statistics.getMin());
        System.out.println("平均金额:" + statistics.getAverage());
    }
}

stream 的特点
1.只能遍历一次:
数据流的从一头获取数据源,在流水线上依次对元素进行操作,当元素通过流水线,便无法再对其进行操作,可以重新在数据源获取一个新的数据流进行操作;
2.采用内部迭代的方式:
对Collection进行处理,一般会使用 Iterator 遍历器的遍历方式,这是一种外部迭代;
而对于处理Stream,只要申明处理方式,处理过程由流对象自行完成,这是一种内部迭代,对于大量数据的迭代处理中,内部迭代比外部迭代要更加高效。