Lambda表达式之并行和并发
并行和并发是个非常大的概念,也不是一篇文章能够完全的包含,所以这边只能简要概括如何通过Lambda表达式来实现。
首先并行和并发是两个不同的概念:
并行英文叫Parallelism,理想情况下,每个线程拥有自己独立的cpu核心,像平行线一样的各自执行自己的任务,多用于CPU密集型的任务;
并发英文叫Concurrency,每个线程不一定拥有独立的cpu核心,每次的执行时间点和执行时间长度由系统调度决定,多用于处理io阻塞型的任务。
不恰当的并行或并发编程会降低系统的处理效率。
Lambda实现并行编程很容易:
1. 集合可以通过parallelStream()方法获取拥有并行处理能力的Stream;
2. Stream可以通过parallel()方法标记你希望以并行的方式处理。
double sum = IntStream.range(1, 1000000) .asDoubleStream() .parallel() .map(x -> Math.sin(x) * Math.sin(x) + Math.cos(x) * Math.cos(x)) .sum();
并行处理的Stream会自行处理锁的问题,所以不要在没有把握的时候自行加锁,尤其要注意有些方法自带synchronized,并行的效果反而没有顺序执行(sequential)好。
refer Java 8 Stream parallel performance and CPU resource consumption seems really poor compared to serial
关于并发,可以参阅最近比较火的响应式编程
这边简要的写个示例:根据iteye用户的rss,来输出博客的标题和链接
class Item { public String title; public String link; }
解析rss
static List<Item> retrieveTitleNLink() { List<Item> list = new LinkedList<>(); try { URL url = new URL("http://xuanzhui.iteye.com/rss"); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.addRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.93 Safari/537.36"); BufferedInputStream inputStream = new BufferedInputStream(connection.getInputStream()); DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder documentBuilder = factory.newDocumentBuilder(); Document document = documentBuilder.parse(inputStream); NodeList itemList = document.getDocumentElement().getElementsByTagName("item"); for (int i = 0; i < itemList.getLength(); i++) { Node node = itemList.item(i); NodeList itemChildren = node.getChildNodes(); Item item = new Item(); for (int j = 0; j < itemChildren.getLength(); j++) { Node child = itemChildren.item(j); if (child.getNodeName().equalsIgnoreCase("title")) item.title = child.getTextContent(); if (child.getNodeName().equalsIgnoreCase("link")) item.link = child.getTextContent(); } list.add(item); } } catch (IOException | ParserConfigurationException | SAXException e) { e.printStackTrace(); } return list; }
接下来是比较核心的代码,先看一下直接用匿名类的写法
public static void main(String[] args) throws IOException { Observable.create(new Observable.OnSubscribe<Item>() { @Override public void call(Subscriber<? super Item> subscriber) { // 要做的就是不断向注册者发送数据 List<Item> list = retrieveTitleNLink(); for (int i = 0; i < list.size(); i++) subscriber.onNext(list.get(i)); } }).subscribeOn(Schedulers.io()).subscribe(new Action1<Item>() { @Override public void call(Item item) { // 要做的就是不断处理上面发送的数据 System.out.println("title: " + item.title); System.out.println("link: " + item.link); System.out.println("*********************"); } }); System.out.println("should print immediately without being blocked ☺"); // 这边只是为了主进程等待io进程结束 System.in.read(); }
要重点关注subscribeOn(Schedulers.io()),表示新开一个io的进程处理网络请求。
然后再看Lambda表达式的写法
public static void main(String[] args) throws IOException { Observable .create(subscriber -> retrieveTitleNLink().forEach(subscriber::onNext)) .subscribeOn(Schedulers.io()) .map(item -> { Item tmp = (Item) item; return tmp.title + " ====>>> " + tmp.link; }) .subscribe(System.out::println); System.out.println("should print immediately without being blocked ☺"); // 这边只是为了主进程等待io进程结束 System.in.read(); }
更详细的用法还得看相关文档。
上一篇: 文件计算的并行查找与过滤
下一篇: 电饭煲煮鸡蛋都有哪些步骤?多久能熟?