Mastering Spark for Data Science:数据集成
作为数据科学家,最重要的任务之一就是将数据加载到数据科学平台中。 本章将说明如何构建Spark中的常规数据提取管道,该管道可作为跨许多输入数据的可重用组件,而不是使用不受控制的临时过程。 我们将逐步进行配置,并演示如何在各种运行条件下提供重要的Feed管理信息。
读者将学习如何构造内容寄存器,并使用它来跟踪加载到系统的所有输入并在摄取管道上传递度量,以便可以可靠地将这些流作为自动的熄灯过程运行。 在本章中,我们将涵盖以下主题:
介绍事件,语言和声调的全球数据库(GDELT)数据集
数据管道
通用摄取框架
实时监控新数据
通过Kafka接收流数据
注册新内容并存储以进行跟踪
可视化Kibana中的内容指标以监控摄取
流程和数据健康
数据管道
即使使用最基本的分析,我们也总是需要一些数据。 实际上,找到正确的数据可能是数据科学中最难解决的问题之一(但这是另一本书的全部主题!)。 在上一章中我们已经看到,获取数据的方式可以根据需要简单或复杂。 在实践中,我们可以将该决定分为两个不同的区域:即席和计划的。
临时数据采集:是原型设计和小规模分析中最常用的方法,因为它通常不需要任何其他软件即可实现。 用户获取一些数据,并仅在需要时从源中简单下载它们。 这个方法 通常,只需单击Web链接并将数据存储在方便的位置,尽管数据可能仍需要版本控制和安全。
预定的数据采集:用于更受控的环境中以进行大规模和生产分析; 将数据集提取到数据湖中以备将来使用也是一个极好的案例。 随着物联网(IoT)的不断增长,在许多情况下,如果不立即提取数据,则将永远丢失大量数据。 这些数据中的大部分今天可能没有明显用途,但将来可能会有用。 因此,我们的思路是在需要时收集所有数据,并在以后确定不需要时将其删除。
显然,我们需要一种灵活的数据采集方法,以支持多种采集选项。
通用摄取框架
有多种方法来获取数据,从本地的bash脚本到高端商业工具。 本部分的目的是介绍一个高度灵活的框架,我们可以将其用于小规模数据提取,然后随着需求的变化一直扩展到完整的,由企业管理的工作流。 该框架将使用Apache NiFi构建。 NiFi使我们能够建立大规模的集成数据管道,从而在整个地球上移动数据。 此外,它甚至还比使用bash或任何其他传统脚本方法更快,更灵活,更容易建立简单的管道。
注意
如果在许多情况下采用临时方法来获取相同的数据集,则应认真考虑是否属于计划类别,或者至少应引入更强大的存储和版本设置。
我们选择使用Apache NiFi是因为它提供了一种解决方案,该解决方案能够创建许多复杂程度各异的管道,这些管道可以扩展到真正的大数据和IoT级别,并且还提供了出色的拖放界面(使用所谓的 基于流的编程https://en.wikipedia.org/wiki/Flow-based_programming)。 通过用于工作流生产的模式,模板和模块,它可以自动处理传统上困扰开发人员的许多复杂功能,例如多线程,连接管理和可伸缩处理。 出于我们的目的,这将使我们能够快速构建用于原型制作的简单管道,并在需要时将其扩展到完整生产。
遵循https://nifi.apache.org/download.html上的信息,该文档已被很好地记录并且易于运行。 它运行在浏览器中,如下所示:
我们将NiFi的安装留给读者练习,我们鼓励您这样做,因为我们将在下一部分中使用它。
介绍GDELT新闻流
希望我们现在已启动并运行NiFi,并且可以开始提取一些数据。 因此,让我们从GDELT的一些全球新闻媒体数据开始。 这是我们的摘要,摘自GDELT网站http://blog.gdeltproject.org/gdelt-2-0-our-global-world-in-realtime/:
“在GDELT监视的新闻报道在世界各地发生的15分钟之内,它已对其进行翻译,处理,以识别所有事件,计数,报价,人物,组织,位置,主题,情感,相关图像,视频和嵌入式社交媒体 帖子,将其置于全球环境中,并通过实时开放的元数据firehose使所有这些都可用,从而实现对地球本身的开放研究。
[作为情感分析领域中规模最大的部署,我们希望通过将跨越多种语言和学科的众多情感和主题维度整合在一起,并将其全部实时应用于来自全球的突发新闻, 将会激发我们思考情感的方式及其可以帮助我们更好地理解我们如何情境化,解释,响应和理解全球事件的方式的一个全新时代。”
我认为您同意这一点非常具有挑战性! 因此,让我们直接开始,而不是延迟,在这里暂停指定详细信息。 在接下来的章节中,我们将介绍GDELT的各个方面。
为了开始使用这些开放数据,我们需要加入元数据流水线,并将新闻流接收到我们的平台上。 我们如何做到这一点? 让我们从找出可用数据开始。
实时发现GDELT在其网站上发布最新文件列表。 此列表每15分钟更新一次。 在NiFi中,我们可以设置一个数据流,该数据流将轮询GDELT网站,从该列表中获取文件,然后将其保存到HDFS,以便以后使用。
在NiFi数据流设计器内部,通过将处理器拖到画布上并选择GetHTTP函数来创建HTTP连接器。
另外,为您要下载的文件列表提供一个临时文件名。 在下面的示例中,我们使用了NiFi的表达语言来生成通用的唯一**,以便文件不会被覆盖(UUID())。
值得注意的是,使用这种类型的处理器(GetHTTP方法),NiFi支持许多用于轮询和检索的调度和定时选项。 现在,我们只使用默认选项,然后让NiFi为我们管理轮询间隔。
来自GDELT的最新文件列表的示例如下所示:
接下来,我们将解析GKG新闻流的URL,以便稍后获取。 通过将处理器拖到画布上并选择ExtractText来创建正则表达式解析器。 现在,将新处理器放置在现有处理器的下方,并将一条线从顶部处理器拖到底部处理器。 通过在弹出的连接对话框中选择成功关系来完成操作。 在以下示例中显示:
接下来,让我们将ExtractText处理器配置为使用仅与文件列表的相关文本匹配的正则表达式,例如:
([^ ] *gkg. csv. *)
NiFi将根据此正则表达式创建与流程设计关联的新属性(在本例中称为url),当每个特定实例通过流程时,该属性将采用新值。 它甚至可以配置为支持多个线程。 同样,此示例如下所示:
在这里值得注意的是,尽管这是一个相当具体的示例,但该技术是有意通用的,可以在许多情况下使用。
我们的第一份预付款
现在我们有了GKG提要的URL,我们通过配置InvokeHTTP处理器以使用之前创建的url属性作为其远程端点来获取它,并像以前一样拖动该行。
剩下的一切就是使用UnpackContent处理器(使用基本的.zip格式)解压缩压缩后的内容,并使用PutHDFS处理器将其保存到HDFS中,如下所示:
改进出版物和订阅
到目前为止,该流程看起来非常点对点,这意味着,如果我们要引入新的数据使用者,例如Spark流作业,则必须更改该流程。 例如,流程设计可能必须更改为如下所示:
如果再添加一个,流程必须再次更改。 实际上,每次我们添加一个新的使用者时,流程都会变得有些复杂,尤其是在添加了所有错误处理后。 显然,这并不总是理想的,因为引入或删除数据的使用者(或生产者)可能是我们经常甚至经常要做的事情。 另外,尝试保持流尽可能简单和可重用也是一个好主意。
因此,对于更灵活的模式,我们可以将其发布到Apache Kafka,而不是直接写入HDFS。 这使我们能够随时添加和删除使用者,而无需更改数据提取管道。 如果需要,我们仍然可以从Kafka写入HDFS,甚至可以通过设计单独的NiFi流程,或使用Sparkstreaming直接连接到Kafka。
为此,我们通过将处理器拖到画布上并选择PutKafka来创建Kafka编写器。
内容注册表
我们在本章中已经看到,数据摄取是一个经常被忽视的领域,其重要性不可低估。 至此,我们有了一个管道,使我们能够从源中摄取数据,安排摄取时间并将数据定向到我们选择的存储库。 但是故事还没有结束。 现在我们有了数据,我们需要履行我们的数据管理职责。 输入内容注册表。 我们将建立与该数据相关的元数据索引
我们已经摄取了。 数据本身仍将定向到存储(在我们的示例中为HDFS),但是此外,我们将存储有关数据的元数据,以便我们可以跟踪收到的数据并了解有关数据的基本信息,例如何时 我们收到了它,它来自哪里,它多大,它是什么类型,等等。
选择和更多选择
如我们所见,选择用于存储该元数据的技术是基于知识和经验的选择。 对于元数据索引,我们至少需要具有以下属性:易于搜索的可伸缩并行写入能力冗余有很多方法可以满足这些要求,例如,我们可以将元数据写入Parquet,存储在HDFS中并使用Spark SQL进行搜索。 但是,这里我们将使用Elasticsearch,因为它可以更好地满足需求,最显着的原因是它可以通过REST API促进对元数据的低延迟查询,这对于创建仪表板非常有用。 实际上,Elasticsearch具有直接与Kibana集成的优势,这意味着它可以快速产生我们的内容注册表的丰富可视化效果。 因此,我们将着眼于Elasticsearch。
顺其自然
使用当前的NiFi管道流,让我们分叉来自
“从URL提取GKG文件”以添加其他步骤
允许我们捕获此元数据并将其存储在Elasticsearch中。
这些是:
1.用我们的元数据模型替换流内容。
2.捕获元数据。
3.直接存储在Elasticsearch中。
这是NiFi中的样子:
因此,这里的第一步是定义我们的元数据模型。 我们可以考虑许多领域,但让我们选择一个有助于解决先前讨论中的几个关键点的问题。 这将为将来可以根据需要添加更多数据提供良好的基础。 因此,让我们保持简单,并使用以下三个属性:
文件大小
摄取日期
文件名
这些将提供接收文件的基本注册。 接下来,在 NiFi流程,我们需要用此新的元数据模型替换实际的数据内容。 一种简单的方法是根据我们的模型创建一个JSON模板文件。 我们将其保存到本地磁盘,并在FetchFile处理器中使用它,以此骨架对象替换流的内容。 该模板如下所示:
{
"FileSize": SIZE,
"FileName": "FILENAME",
"IngestedDate": "DATE"
}
注意使用占位符名称(SIZE,FILENAME,DATE)代替属性值。 这些将被一系列ReplaceText处理器一一替换,这些处理器使用NiFi表达式语言提供的正则表达式将占位符名称交换为适当的流属性,例如DATE变为$ {now()}。
最后一步是将新的元数据有效负载输出到Elasticsearch。 NiFi再次为此配备了处理器。 PutElasticsearch处理器。
Elasticsearch中的示例元数据条目:
{
"_index": "gkg",
"_type": "files",
"_id": "AVZHCvGIV6x-JwdgvCzW",
"_score": 1,
"source": {
"FileSize": 11279827,
"FileName":
"20150218233000. gkg. csv. zip",
"IngestedDate": "2016-08-
01T17: 43: 00+01: 00"
}
现在,我们已经添加了收集和查询元数据的功能,现在,我们可以访问更多可用于分析的统计信息。 这包括:基于时间的分析,例如,随时间变化的文件大小数据丢失,例如,时间轴中是否存在数据漏洞? 如果需要特定的分析,则可以调整NIFI元数据组件以提供相关的数据点。 实际上,如果元数据不存在于当前数据中,则可以构建一个分析来查看历史数据并相应地更新索引。
Kibana仪表板:
在本章中,我们已经多次提到过Kibana。 现在我们在Elasticsearch中有了元数据索引,我们可以使用该工具来可视化一些分析。 本简短部分的目的是证明我们可以立即开始对数据进行建模和可视化。 要查看在更复杂的场景中使用的Kibana,请查看第9章,新闻词典和实时标记系统。 在这个简单的示例中,我们完成了以下步骤:
1.将“ GDELT”元数据的Elasticsearch索引添加到“设置”选项卡。
2.在“发现”选项卡下选择文件大小。
3.选择显示文件大小。
4.将“聚合”字段更改为“范围”。
5.输入范围值。
结果图显示文件大小分布:
从这里,我们可以*地创建新的可视化文件,甚至可以创建功能齐全的仪表板,以用于监视文件提取的状态。 通过增加从NiFi写入Elasticsearch的元数据的种类,我们可以在Kibana中提供更多字段,甚至可以从这里基于一些基于摄取的可操作见解开始我们的数据科学之旅。 既然我们拥有一个功能齐全的数据管道,可以为我们提供实时的数据提要,那么我们如何确保接收到的有效载荷的数据质量? 让我们看一下这些选项。
质量保证
实施初始数据提取功能并将数据流传输到平台后,您将需要确定“前门”需要多少质量保证。从没有初始质量控制开始并逐步建立起来(在时间和资源允许的情况下回顾性地扫描历史数据),这是完全可行的。但是,一开始安装基本级别的验证可能是明智的。例如,基本检查,例如文件完整性,奇偶校验,完整性,校验和,类型检查,字段计数,过期文件,安全字段预填充,反规范化等等。您应注意,前期检查不会花费太长时间。根据检查的强度和数据的大小,遇到下一个数据集到达之前没有足够的时间执行所有处理的情况并不少见。您将始终需要监视集群资源并计算最有效的时间使用方式。
以下是您可以执行的粗略产能计划计算类型的一些示例:
示例1-基本质量检查,没有竞争的用户每15分钟就会提取一次数据,并且要从源中提取数据需要1分钟。质量检查(完整性,字段计数,字段预填充)需要4分钟。计算群集上没有其他用户 有10分钟的资源可用于其他任务。 由于群集上没有其他用户,因此很令人满意-无需采取任何措施。
示例2-高级质量检查,没有竞争用户 :
每15分钟提取一次数据,并从源中提取数据需要1分钟。质量检查(完整性,字段计数,字段预填充,反规范化,子数据集构建)需要13分钟。计算集群上没有其他用户 分钟的资源可用于其他任务。 您可能需要考虑以下任一情况:
配置资源调度策略
减少摄取的数据量
减少我们承担的加工量
向集群添加其他计算资源
示例3-基本的质量检查,由于竞争的用户而产生的效用为50%
每15分钟提取一次数据,并且需要1分钟才能从源中提取数据
质量检查(完整性,字段数,字段预填充)需要4分钟(100%的效用)
计算群集上还有其他用户
有6分钟的资源可用于其他任务(15 -1-(4 *(100/50)))。 由于还有其他用户,因此存在危险,至少在某些情况下,我们将无法完成处理,并且会积压工作。
当您遇到时间问题时,为了避免积压,您可以使用多种选择:
在特定时间协商仅使用资源
配置资源调度策略,包括:
YARN Fair Scheduler:允许您定义优先级不同的队列,并通过设置Spark来定位Spark作业。 纱。 启动时使用队列属性,因此您的工作始终优先
动态资源分配:允许同时运行的作业自动扩展以匹配其利用率
Spark调度程序池:允许您在使用多线程模型共享SparkContext时定义队列,并通过设置spark来定位Spark作业。 调度程序。 每个执行线程的池属性,因此您的线程
优先集群安静时在一夜之间运行处理作业
无论如何,您最终都将很好地了解工作中各个部分的性能,然后可以计算出可以进行哪些更改以提高效率。 总是可以选择投入更多的资源来解决问题,尤其是在使用云提供商时,但是我们当然会鼓励对现有资源的智能使用-这具有更大的可扩展性,更便宜,并且可以建立数据专业知识。
摘要
在本章中,我们介绍了Apache NiFi GDELT接收管道的完整设置,包括元数据派生和可视化结果数据的简要介绍。 由于在整本书中广泛使用了GDELT,而NiFi方法是一种以可扩展和模块化的方式来获取数据的高效方法,因此这一部分特别重要。 在下一章中,我们将通过查看模式和格式来掌握数据一旦到达就该如何处理。