欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用Java 8 Streams处理大量数据

程序员文章站 2022-04-08 19:41:17
...

  今天,我们开放采购内部的Java 8流实用程序库,可以以高效的方式聚合,合并或连接流。要跳过代码,请查看 github回购 。

  在指导下,我们构建了Searchlight,一个内容智能平台,帮助用户从大量的原始数据中获得洞察。过去我们已经写了一堆帖子,用于使我们的数据处理更加高效,从 大量操作到卡夫卡流 , 将快速框架压缩纳入hadoop ,甚至 大幅缓存数据请求前端使用Backbone.Hoard 。我今天在这里谈谈一些我们用来做高效数据流的技术,并分享一些可能使你的生活更美好的代码!

  问题

  我们的客户依靠我们收集大量数据(每周接近20TB,并且不断增长),并将其处理成可在我们的应用程序中查看的可操作的见解。该处理的一个步骤是将数据转化为需要提供具体洞察力的数据。然而,即使在解决了最小可能的相关信息之后,我们仍然留下了数百MB的磁盘上的报告。

  我们需要在小量,固定的内存中聚合,加入和总结这些潜在的大型报告。

  输入Java 8流。

  Java 8流 描述了将元素从源到目的地的操作流水线。更具体地说,流可以让您在一组数据中定义一组操作,这些数据在功能范例中与数据来源不同。

  虽然大多数文献在线利用Java 8流主要用于 列表推导 ,但流也可用于描述具有未知大小(潜在无限)的一组数据的操作,这些数据不一定全部保存在内存中。

  在我们的例子中,我们将数据源与一些漂亮的管道连接,将InputStream转换成物化流。这使我们可以利用Java中的Stream和Collection之间的关键区别之一 - 集合表示一组完全存储在内存中的数据,Streams只是与存储无关的一系列数据。

  通过我们的大型数据集流只允许目前被操纵的项目被保存在内存中,而不是完整的集合。这使我们能够获得我们的“小而固定的内存量”的目标。

  Java 8流不足的复杂用例

  我们的UI可以对数据集进行一些高度动态的观察,因此我们可以合理地进行预计算的数量有限。

  例如,我们有一个看法,沿着特定的轴连接两个不同周的数据。这两个星期可以任意选择,这将使预先计算的空间爆炸成一大堆排列。这意味着我们必须尽快加入这两个数据集。

  由于构建了Java 8流来描述针对任意数据源的一般操作,所以有一些操作根本不支持开箱即用,而其他操作无效率地实现。然而,由于我们拥有我们的数据集,并且有能力预处理数据,所以我们可以强制我们的数据进行排序,并使用这些知识构建可以懒惰运行的专用流实用程序。

  我们的流实用程序回购包含一些工具来解决复杂的问题,其中预处理要分类的数据集可以显着提高我们正在执行的操作的内存占用。

  我们来看一些例子。

  懒惰聚集

  假设我在给定客户的网站上有所有内容的流。如果我的数据集没有排序,为了按类别(操作方法,产品评论等)分组内容,并返回一个 Stream> ,我将需要实现整个流 - 未排序的数据流可能有一个项目它属于第一个组作为流中的第一个项目,以及流中的最后一个项目。这意味着在数据集完全处理之前,您不知道该组是完整的。但是让我们按照内容分类对流进行排序。在这种情况下,我可以以懒惰的方式将流分类 。

  Java 8分组运算符实际上实现了完整流,因为运算符是collect 方法的一部分 ,这是 终端操作 。终端操作需要消耗完整的流,因此几乎总是渴望:

  final Map > contentGroupedByCategory=contentStream

  然而,由于我们知道流ContentCategory 已经被排序了 ,所以我们可以利用这些知识建立一个分组运算符,该运算符只需要迭代才能找到内容类别之间的边界,并发出分组的项目。

  OrderedStreamUtilsBy

  为了使用这个实用程序,只需确保你的数据流按照你的键控函数返回(在我们的例子中是内容类别)。

  import com.conductor.stream.utils.OrderedStreamUtils;//我们知道contentStream是按类别排序的。因此,//我们可以做一个流组最终 Stream > contentGroupedByCategory=OrderedStreamUtils By(contentStream,c - > c .getContentCategory());

  与上述的Java流框架版本相反,该流程返回Map,此流实用程序将返回分组数据流。这意味着只有 List 在内存中才是正在处理的内存 。

  懒人加盟

  现在,我们假设我们有两个数据流,每个数据都代表了我们客户的搜索引擎的性能,也许来自不同的年份。我们想要的输出是搜索结果流,两者之间存在的每个结果之前和之后,以及来自个别年份的不重叠的结果。

  为了澄清,当我在这里说“加入”时,表示的操作是从两个流中获取每个匹配项,并组合这些匹配对象以产生一个新对象。

  使用Java 8流框架的本机操作符,无法以懒惰的方式执行此操作。在这种情况下,甚至没有非懒惰的方便操作员。

  再次,我们的流被排序的事实使我们能够有效地加入这两个数据集。根据要求,我们可以从每一边拉出项目,直到找到匹配的集合,并使用排序顺序确定哪一方在后面。所提供的JoinType决定了我们对不匹配的项目所做的工作。

  OrderedStreamUtils.join

  在这种情况下,我们将加入每个搜索结果的ID。这意味着两个流必须按照我们的连接操作工作的结果ID进行排序。

  import com.conductor.stream.utils.OrderedStreamUtils;import com.conductor.stream.utils.join.JoinType;final Function searchResultIdKeyingFunction=sr - > sr.getResultId();//我们知道searchResultStream2021和searchResultStream2021 //按结果ID排序。因此,我们可以进行流式加入。final Stream joinedStream=OrderedStreamUtils。加入(

  请注意,这还会返回懒惰评估的数据流。

  多流合并

  假设你有一堆客户端HTML建议,一个在他们网站上的每个页面。这些流在内部按优先级排序。您想要按照优先级排列整个网站上的建议列表。

  这不是Java 8流框架中的本机操作符之一,因为它要求游戏买号底层流以排序顺序生成项,这并不能保证所有流都可以做到。

  再一次,如果我们知道我们的流被排列在前面,它们相对简单地将它们懒惰地并入。

  OrderedStreamUtils.sortedMerge

  sortedMerge运算符简单地比较每个流中的下一个项目,并根据给定的比较器发出下一个项目。这意味着流必须全部由所述比较器排序。

  import com.conductor.stream.utils.OrderedStreamUtils;//这些流都按优先级排序。因此,我们需要一个优先级比较器//。最终 比较器 comparator=Comparator paring(HtmlRecommendation :: getPriority);final Stream mergedStreams=OrderedStreamUtils .sortedMerge(Arrays .asList(第1页建议书,第2页建议书,第3页建议书),比较器);

  StreamUtils

  除了 OrderedStreamUtils 运营商之外,我们还组合了一小部分实用程序,可以使得在任何流量之上构建处理流水线更容易,无需订购。

  发现自己写了很多 Collector 没有组合器?使用 StreamUtils.noCombiner() 。

  用于RxJava的 switchIfEmpty 运算符?我们把它带入了Java 8流 StreamUtils.switchIfEmpty() 。

  想要将流分成一定大小的集合?也许您希望一次处理1000个项目,而不会实现您的全套数据?尝试 StreamUtils.buffer 。

  结论 - 使用Conductor的stream-utils来增强您的Java 8 Streams

  我希望你发现这组实用程序有用。过去几个月来,我们一直在内部使用它们,使我们最大的一些数据在Searchlight中更容易处理,我们很高兴能够将它们交给社区!

  1、具有1-5工作经验的,面对目前流行的技术不知从何下手,

  需要突破技术瓶颈的可以加。2、在公司待久了,过得很安逸,

  但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加。

  3、如果没有工作经验,但基础非常扎实,对java工作机制,

  常用设计思想,常用java开发框架掌握熟练的,可以加。

  4、觉得自己很牛B,一般需求都能搞定。

  但是所学的知识点没有系统化,很难在技术领域继续突破的可以加。

  5. 群号:高级架构群 478052716备注好信息!

  6.阿里Java高级大牛直播讲解知识点,分享知识,

  多年工作经验的梳理和总结,带着大家全面、

  科学地建立自己的技术体系和技术认知!