欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Lambda表达式之并行和并发

程序员文章站 2022-05-18 12:25:08
...

并行和并发是个非常大的概念,也不是一篇文章能够完全的包含,所以这边只能简要概括如何通过Lambda表达式来实现。

 

首先并行和并发是两个不同的概念:

并行英文叫Parallelism,理想情况下,每个线程拥有自己独立的cpu核心,像平行线一样的各自执行自己的任务,多用于CPU密集型的任务;

并发英文叫Concurrency,每个线程不一定拥有独立的cpu核心,每次的执行时间点和执行时间长度由系统调度决定,多用于处理io阻塞型的任务。

 

不恰当的并行或并发编程会降低系统的处理效率。

 

Lambda实现并行编程很容易:

1. 集合可以通过parallelStream()方法获取拥有并行处理能力的Stream;

2. Stream可以通过parallel()方法标记你希望以并行的方式处理。

 

double sum = IntStream.range(1, 1000000)
    .asDoubleStream()
    .parallel()
    .map(x -> Math.sin(x) * Math.sin(x)  + Math.cos(x) * Math.cos(x))
    .sum();

 

并行处理的Stream会自行处理锁的问题,所以不要在没有把握的时候自行加锁,尤其要注意有些方法自带synchronized,并行的效果反而没有顺序执行(sequential)好。

refer Java 8 Stream parallel performance and CPU resource consumption seems really poor compared to serial

 

关于并发,可以参阅最近比较火的响应式编程

http://reactivex.io/

 

这边简要的写个示例:根据iteye用户的rss,来输出博客的标题和链接

class Item {
    public String title;
    public String link;
}

 解析rss

static List<Item> retrieveTitleNLink() {
    List<Item> list = new LinkedList<>();
    try {
        URL url = new URL("http://xuanzhui.iteye.com/rss");
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.addRequestProperty("user-agent",
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.93 Safari/537.36");
        BufferedInputStream inputStream = new BufferedInputStream(connection.getInputStream());

        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        DocumentBuilder documentBuilder = factory.newDocumentBuilder();
        Document document = documentBuilder.parse(inputStream);
        NodeList itemList = document.getDocumentElement().getElementsByTagName("item");

        for (int i = 0; i < itemList.getLength(); i++) {
            Node node = itemList.item(i);
            NodeList itemChildren = node.getChildNodes();

            Item item = new Item();
            for (int j = 0; j < itemChildren.getLength(); j++) {
                Node child = itemChildren.item(j);

                if (child.getNodeName().equalsIgnoreCase("title"))
                    item.title = child.getTextContent();
                if (child.getNodeName().equalsIgnoreCase("link"))
                    item.link = child.getTextContent();
            }
            list.add(item);
        }

    } catch (IOException | ParserConfigurationException | SAXException e) {
        e.printStackTrace();
    }

    return list;
}

 接下来是比较核心的代码,先看一下直接用匿名类的写法

public static void main(String[] args) throws IOException {
    Observable.create(new Observable.OnSubscribe<Item>() {
        @Override
        public void call(Subscriber<? super Item> subscriber) {
            // 要做的就是不断向注册者发送数据
            List<Item> list = retrieveTitleNLink();
            for (int i = 0; i < list.size(); i++)
                subscriber.onNext(list.get(i));
        }
    }).subscribeOn(Schedulers.io()).subscribe(new Action1<Item>() {
        @Override
        public void call(Item item) {
            // 要做的就是不断处理上面发送的数据
            System.out.println("title: " + item.title);
            System.out.println("link: " + item.link);
            System.out.println("*********************");
        }
    });

    System.out.println("should print immediately without being blocked ☺");

    // 这边只是为了主进程等待io进程结束
    System.in.read();
}

 要重点关注subscribeOn(Schedulers.io()),表示新开一个io的进程处理网络请求。

 

然后再看Lambda表达式的写法

public static void main(String[] args) throws IOException {
    Observable
            .create(subscriber -> retrieveTitleNLink().forEach(subscriber::onNext))
            .subscribeOn(Schedulers.io())
            .map(item -> {
                Item tmp = (Item) item;
                return tmp.title + " ====>>> " + tmp.link;
            })
            .subscribe(System.out::println);

    System.out.println("should print immediately without being blocked ☺");

    // 这边只是为了主进程等待io进程结束
    System.in.read();
}

 更详细的用法还得看相关文档。

 

 

 

 

相关标签: 并行 并发