DolphinDB基础概念理解:流数据聚合引擎
-
流数据
流数据是指随时间延续而增长的动态数据集合。金融机构的交易数据、物联网的传感器数据和互联网的运营数据都属于流数据的范畴。流数据的特性决定了它的数据集一直是动态变化的。传统的面向静态数据表的计算引擎无法胜任流数据领域的分析和计算任务,所以流数据场景需要一套针对性的计算引擎。
-
计算引擎
搜索引擎,是一种信息检索系统,目的是协助搜索存储计算机系统中的信息。更确切地说,是根据一定的策略、运用特定的计算机程序从互联网上搜索信息,在对信息进行组织和处理后,为用户提供检索服务,将用户检索相关的信息展示给用户的系统。搜索引擎包括全文索引、目录索引、元搜索引擎、垂直搜索引擎、集合式搜索引擎、门户搜索引擎与免费链接列表等。
对比理解计算引擎,一个整的动力核心,负责计算中数据的来源,数据的操作,数据的管理 并将合适的计算结果根据要求给予返回。处理数据处理的目的不同会有很多不同的类别。
-
大数据计算引擎历史
大数据计算引擎的发展历程主要分为四个阶段,目前主流的计算引擎是第三代Spark以及最近比较火的Flink:
-
DolphinDB的聚合引擎应用框架
DolphinDB中提聚合引擎,主要是指针对流数据消费时的聚合计算,因此聚合引擎常与流数据订阅功能配合使用。
更详细的理解参见《DolphinDB基础概念理解:流数据处理框架》
聚合引擎中的一些基本概念:
-
流数据表
DolphinDB
为流式数据提供的一种特定的表对象,提供流式数据的发布功能。通过
subscribeTable
函数,其他的节点或应用可以订阅和消费流数据。 -
聚合引擎数据源
为聚合引擎提供"原料"的通道。
createTimeSeriesAggregator
函数返回一个抽象表,向这个抽象表写入数据,就意味着数据进入聚合引擎进行计算。 -
聚合表达式
以元代码的格式提供一组处理流数据的聚合函数,例如
<[sum(qty)]>
或<[sum(qty),max(qty),avg(price)]>
。聚合引擎支持使用系统内所有的聚合函数,也支持使用表达式来满足更复杂的场景,比如
<[avg(price1)-avg(price2)]>
,<[std(price1-price2)]>
这样的组合表达式。 -
数据窗口
每次计算时截取的流数据窗口长度。
-
数据窗口详解
流数据聚合计算,是每隔一段时间(step),在固定长度的移动窗口(windowSize)中进行。
参数
windowSize
和step
的单位由参数useSystemTime
(default
值为false
)设定。当参数useSystemTime=true
时,windowSize
和step
的单位为系统时间(毫秒)精度,否则以数据生成时间精度为单位。流数据聚合计算场景有两种时间概念:
一是数据的生成时间;二是数据进入聚合引擎的时间
时间概念 格式 精度 备注 使用条件 数据的生成时间 时间戳 天,分钟,秒,毫秒,纳秒 useSystemTime = false(默认) 数据进入聚合引擎的时间 是 毫秒 由聚合引擎给数据打上的时间戳,为聚合引擎所在服务器的系统时间 use System Time = true 在有多组数据的情况下,若每组都根据各自第一条数据进入系统的时间来构造数据窗口的边界,则一般无法将各组的计算结果在相同数据窗口中进行对比。考虑到这一点,系统按照参数
step
值确定一个整型的规整尺度alignmentSize
,以对各组第一个数据窗口的边界值进行规整处理。(数据生成时间)数据时间精度为秒时,如DATETIME或SECOND类型,alignmentSize取值规则如下表:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~15 15 16~20 20 21~30 30 31~60 60 (系统时间)数据时间精度为毫秒时,如TIMESTAMP或TIME类型,alignmentSize取值规则如下表:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501~1000 1000 流数据计算案例,参见《DolphinDB基础函数解析:createTimeSeriesAggregator()》
-
聚合表达式
实际的应用中,通常要对流数据进行比较复杂的聚合计算。
这种复杂度要丢聚合引擎的表达式足够灵活。
-
纵向聚合计算(按时间序列聚合)
tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <sum(ofr)>, trades, outputTable, `time)
-
横向聚合计算(按维度聚合)
tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time) tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <max(ofr-bid)>, trades, outputTable, `time)
-
输出多个聚合结果
有些聚合函数会使用多个参数,例如
corr
,percentile
等函数。tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time)
-
多参数聚合函数的调用
tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <corr(ofr,bid)>, trades, outputTable, `time) tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <percentile(ofr-bid,99)/sum(ofr)>, trades, outputTable, `time)
-
调用自定义函数
def spread(x,y){ return abs(x-y)/(x+y)*2 } tradesAggregator = createTimeSeriesAggregator("streamAggr1", 6, 3, <spread(ofr, bid)>, trades, outputTable, `time)
不支持流数据聚合函数嵌套调用,例如若要在流数据引擎中计算sum(spread(ofr,bid)),系统会给出异常提示:
Nested aggregated function is not allowed
。 -
流数据源
共享的流数据表(即发布表),可以作为流数据源。
通过
subscribeTable
函数不仅仅可以订阅数据源,而且可以过滤数据源。 -
聚合引擎输出
聚合结果可以输出到:
-
createTimeSeriesAggregator函数
createTimeSeriesAggregator
函数关联了流数据聚合应用的3个主要信息:函数返回一个抽象的表对象,作为聚合引擎的入口,向这个表写入数据,意味着数据进入聚合引擎进行计算。
更多详细解释,参见《DolphinDB基础函数解析:createTimeSeriesAggregator()》
-
聚合引擎管理函数
系统提供聚合引擎的管理函数,方便查询和管理系统中已经存在的集合引擎:
-
getAggregatorStat
说明: 获取已定义的聚合引擎清单。
-
getAggregator
说明: 获取聚合引擎的句柄。
-
removeAggregator
说明: 移除聚合引擎对象。
-
总结
DolphinDB提供了轻量且使用方便的流数据聚合引擎,它通过与流数据表一同使用来完成流数据的实时计算任务,可支持纵向聚合和横向聚合以及组合计算,支持自定义函数计算,分组聚合,数据清洗,多级计算等功能,满足流数据实时计算各方面需求。
-
Reference
- Github >> Tutorial_CN >> DolphinDB流数据聚合引擎
- References
上一篇: 前端性能优化---1.了解前端性能优化点
下一篇: 独占式超时获取同步状态