理解DolphinDB流数据处理框架
-
专业术语解释实时流处理
实时流处理是指将业务系统产生的持续增长的动态数据进行实时的收集、清洗、统计、入库,并对结果进行实时的展示。在金融交易、物联网、互联网/移动互联网等应用场景中,复杂的业务需求对大数据处理的实时性提出了极高的要求。面向静态数据表的传统计算引擎无法胜任流数据领域的分析和计算任务。
-
DolphinDB的流数据框架
DolphinDB
内置的流数据框架支持流数据的发布、订阅、预处理、实时内存计算、复杂指标的滚动窗口计算等,是一个运行高效,使用便捷的流数据处理框架。当前我也只是字面理解,后续清楚之后再用白话补充
-
流数据处理框架及概念准备
DolphinDB流数据模块采用发布-订阅-消费的模式。流数据首先注入流数据表中,通过流数据表来发布数据,数据节点或者第三方的应用可以通过DolphinDB脚本或API来订阅及消费**流数据。
-
流数据表
流数据表是用以存储流数据、支持同时读写的一种内存表。
发布一条消息等价于向流数据表插入一条记录。可使用SQL语句对流数据表进行查询和分析。
-
发布和订阅
采用经典的订阅发布模式。每当有新的流数据写入时,发布方会通知所有的订阅方处理新的流数据。数据节点使用
subscribeTable
函数来订阅流数据。 -
实时聚合引擎
实时聚合引擎指的是专门用于处理流数据实时计算和分析的模块。DolphinDB提供
createTimeSeriesAggregator
与createCrossSectionalAggregator
函数创建聚合引擎对流数据做实时聚合计算,并且将计算结果持续输出到指定的数据表中。关于如何使用聚合引擎请参考流数据聚合引擎。
-
配置以开启流数据功能
-
配置发布节点
maxPubConnections:发布信息节点最多可连接几个节点。若maxPubConnections>0,则该节点可作为发布节点。默认值为0。 persistenceDir:保存共享的流数据表的文件夹路径。若需要保存流数据表,必须指定该参数。 persistenceWorkerNum:负责以异步模式保存流数据表的工作线程数。默认值为0。 maxPersistenceQueueDepth:以异步模式保存流数据表时消息队列的最大深度(记录数)。默认值为10,000,000。 maxMsgNumPerBlock:发布消息时,每个消息块中最多可容纳多少条记录。默认值为1024。 maxPubQueueDepthPerSite:发布节点消息队列的最大深度(记录数)。默认值为10,000,000。
-
配置订阅节点
subPort:订阅线程监听的端口号。当节点作为订阅节点时,该参数必须指定。默认值为0。 subExecutors:订阅节点中消息处理线程的数量。默认值为0,表示解析消息线程也处理消息。 maxSubConnections:服务器能够接收的最大的订阅连接数。默认值是64。 subExecutorPooling: 表示执行流计算的线程是否处于pooling模式的布尔值。默认值是false。 maxSubQueueDepth:订阅节点消息队列的最大深度(记录数)。默认值为10,000,000。
-
流数据发布
向流数据表(
streamTable
函数创建)写入数据,即意味着发布数据。streamTable只是创建流数据表,并不是创建发布表,向
将流数据表共享(通过
share
命令),因为流数据表需要被不同会话访问,不被共享的流数据表无法发布数据。# 定义并共享流数据表 share streamTable(10000:0, `timestamp`temperature, [TIMESTAMP, DOUBLE]) as pubTable
-
流数据订阅
订阅数据(通过
subscribeTable
函数实现)subscribeTable([server], tableName, [actionName], [offset=-1], handler, [msgAsTable=false], [batchSize=0], [throttle=1], [hash=-1], [reconnect=false], [filter], [persistOffset=false])
只有tableName和handler两个参数是必须的,其他均为可选参数
subscribeTable函数的返回值是订阅主题,它是订阅表所在节点的别名、流数据表名称和订阅任务名称(如果指定了actionName)的组合,使用"/"分隔。如果订阅主题已经存在,函数将会抛出异常。
>> share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable >> topic1 = subscribeTable(, "pubTable", "actionName_realtimeAnalytics", 0, subTable, true) >> topic1 NODE1/pubTable/actionName_realtimeAnalytics
参数 意义 备注 tableName
被订阅的流数据表名 handler
用于处理订阅数据,是一元函数或表。 若是函数:其唯一的参数是订阅到的数据;
若是数据表:订阅数据直接插入到该表中;server 空或空字符串:发布节点在本地;
服务器的别名:发布节点是同一集群的其他节点;
远程连接handle:发布节点不在订阅节点所在集群。actionName 用于区分同一个流数据表被订阅用于不同场景的情况 流数据可以针对各种场景分别订阅消费。同一份流数据,可用于实时聚合运算,同时亦可将其存储到数据仓库供第三方应用做批处理。 offset 订阅任务开始后的第一条消息所在的位置 消息:流数据表中的行;
如果没有指定,或者为-1,或者超过了流数据表的记录行数,订阅将会从流数据表的当前行开始;
如果-2,系统会自动获取持久化到磁盘上的offset,并从该位置开始订阅;
offset的值永远与流数据表创建时的第一行对应,如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。msgAsTable 表示订阅的数据是否为表的布尔值 false(默认):表示订阅数据是由列组成的元组;
true:表示订阅数据是表batchSize 表示出发批处理的消息的行数,是一个整数。
用于数据缓冲,当流数据的写入频率非常高,以致数据消费能力跟不上数据进入的速度时,需要进行流量控制。
否者订阅端缓冲区很快会堆积数据并耗光内存。可以根据订阅端的消费速度设定throttle参数,定时将数据导入订阅段,保障订阅端的缓冲区数据量稳定。正数:直到消息的数量达到batchSize时,handler才会开始处理消息;
没有指定或非正数:只要有一条消息进入,handler就会马上开始处理消息。throttle 表示handler处理进来的消息之前等待的时间,是一个整数 以秒为单位,默认1。如果没有指定batchSize,则throttle将不会起作用。 hash 指定某个订阅线程处理进来的消息,是一个非负整数 如果没有指定该参数,系统会自动分配一个线程;
当需要在两个或多个订阅的处理过程中保持消息数据的同步,可以将多个订阅的hash值设置成相同,这样就能使用同一个线程来同步处理多个数据源,不会出现数据处理有先后导致结果误差reconnect 布尔值 false(默认):如果网络异常等导致订阅中断,订阅端不会自动重新订阅;
true:订阅端会在网络正常时,自动从终端位置重新订阅。
如果是发不断崩溃导致订阅中断,那么订阅端会在发不断重启后不断尝试重新订阅。
若发布端对流数据表启用了持久化,那么发布端重启后会首先读取硬盘上的数据,直到发布端读取到订阅中断位置的数据,订阅端才能成功重新订阅。
若发布端没有对流数据表启用持久化,那么将重新订阅失败。
如果订阅端崩溃导致订阅中断,即使设置了reconnect=true,订阅端重启后也无法自动重新订阅。filter 过滤数据(不支持BOOL类型数据),是一个向量 该参数需要配合 setStreamTableFilterColumn
函数一起使用。使用setStreamTableFilterColumn
指定流数据表的过滤列,流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。persistOffset 表示是否持久化保存本次订阅已经处理的数据的偏移量,是一个布尔值 false(默认)。持久化保存的偏移量用于重订阅,可通过 getTopicProcessedOffset
函数获取。注释:
- handler中订阅到的数据:可以是一个数据表或元组;订阅数据表的每一列是元组的一个元素。
- offset中持久化到磁盘:关于持久化,参见下文;
- offset中当前行:指插入数据后,最新的一行,也就是从这里开始接受新数据。即只有当新数据进入发布表时才能订阅到数据;
- 相关代码在执行之前,需要先对streaming功能进行配置,才能启用streaming;
- share函数创建共享表,与subscribe创建订阅表需要分开、顺序执行,同时执行会出错;
-
发布者与订阅者所在节点关系的三种可能
-
发布者与订阅者是同一节点
# 参数server使用空字符串或者为空 subscribeTable(, 'pubTable', 'actionName', 0, subTable, true)
-
发布者与订阅者是同一集群内的不同节点
# 参数server使用发布节点别名 subscribeTable('NODE2', 'pubTable', 'actionName', 0, subTable, true)
-
发布者与订阅者不再同一个集群内
# 参数server使用发布节点的远程连接handle pubNodeHandler=xdb("192.168.1.13",8891) subscribeTable(pubNodeHandler, "pubTable", "actionName", 0, subTable, true)
-
-
断线重连
DolphinDB
的流数据订阅提供了自动重连的功能。如果要启用自动重连,发布端必须对流数据持久化。当
reconnect
参数设为true
时,订阅端会记录流数据的offset
,连接中断时订阅端会从offset
开始重新订阅。如果订阅端崩溃或者发布端没有对流数据持久化,订阅端无法自动重连。
-
发布端数据过滤
发布端可以过滤数据,只发布符合条件的数据。使用
setStreamTableFilterColumn
指定流数据表的过滤列,过滤列的值在filter指定值中的数据会发布到订阅端,不在filter指定值中的数据不会发布。目前仅支持对一个列进行过滤。 -
取消订阅
每一次订阅都由一个订阅主题topic作为唯一标识。
如果订阅时topic已存在,那么会订阅失败。这时需要通过
unsubscribeTable
命令取消订阅才能再次订阅。取消订阅示例如下: -
流数据持久化
默认情况下,流数据表把所有数据保存在内存中。基于以下三点考量,可将流数据持久化到磁盘:
我们可事先设定一个界限值。若流数据表的行数达到设定的界限值,前面一半的记录行会从内存转移到磁盘。持久化的数据支持重订阅,当订阅指定数据下标时,下标的计算包含持久化的数据。
要启动流数据持久化,首先要在发布节点的配置文件中添加持久化路径:
persisitenceDir = /data/streamCache
-
enableTablePersistence持久化流数据
在脚本中执行
enableTablePersistence
命令设置针对某一个流数据表启用持久化。下面的示例针对pubTable
表启用持久化,其中asyn = true, compress = true, cacheSize=1000000
,即当流数据表达到100万行数据时启用持久化,将其中50%的数据采用异步方式压缩保存到磁盘。enableTablePersistence(pubTable, true, true, 1000000)
若执行
enableTablePersistence
时,磁盘上已经存在pubTable表的持久化数据,那么系统会加载最新的cacheSize=1000000行记录到内存中。对于持久化是否启用异步,需要在持久化数据一致性和性能之间作权衡。当流数据的一致性要求极高时,可以使用同步方式,这样可以保证持久化完成以后,数据才会进入发布队列;若对实时性要求极高,不希望磁盘IO影响到流数据的实时性,那么可以启用异步方式。只有启用异步方式时,持久化工作线程数
persistenceWorkerNum
配置项才会起作用。当有多个发布表需要持久化,增加persistenceWorkerNum
的配置值可以提升异步保存的效率。 -
clearTablePersistence删除持久化数据
当不需要保存在磁盘上的流数据时,通过
clearTablePersistence
命令可以删除持久化数据:clearTablePersistence(pubTable)
-
disableTablePersistence关闭持久化
disableTablePersistence(pubTable)
-
getPersistenceMeta获取流数据表的持久化细节情况
getPersistenceMeta(pubTable)
输出结果是一个字典:
//内存中的数据记录数 sizeInMemory->0 //启用异步持久化 asynWrite->true //流数据表总记录数 totalSize->0 //启用压缩存储 compress->true //当前内存中数据相对总记录数的偏移量,在持久化运行过程中遵循公式 memoryOffset = totalSize - sizeInMemory memoryOffset->0 //已经持久化到磁盘的数据记录数 sizeOnDisk->0 //日志文件的保留时间,默认值是1440分钟,即一天。 retentionMinutes->1440 //持久化路径 persistenceDir->/hdd/persistencePath/pubTable //hashValue是对本表做持久化的工作线程标识。 hashValue->0 //磁盘上第一条数据相对总记录数的偏移量。例如,若diskOffset=10000,表示目前磁盘上的持久化流数据从第10000条记录开始。 diskOffset->0
-
数据回放
replay函数,实现历史数据按照时间顺序“实时”导入流数据表中。
这也是对量化回测价值最大的地方。
-
流数据API
流数据的消费者可能是:
DolphinDB
提供了streaming API
供第三方程序来订阅流数据。当有新数据进入时,API的订阅者能够及时的接收到通知,这使得DolphinDB的流数据框架可与第三方的应用进行深入的整合。 -
Java API
Java API
处理流数据的方式有两种:轮询方式(Polling
)和事件方式(EventHandler
)。 -
C++ API
C++ API
处理流数据的方式有三种:ThreadedClient
,ThreadPooledClient
和PollingClient
。 -
C# API
当流数据到达客户端时,
C# API
有两种处理数据的方式:- 客户端应用定期检查是否有新数据,当客户端发现有新数据,会获取并消费流数据。
- 通过预先定义的
MessageHandler
直接消费新数据。
-
Python API
《 DolphinDB使用案例16:Python实现流数据订阅》
-
状态监控
当通过订阅方式对流数据进行实时处理时,所有的计算都在后台进行,用户无法直观的看到运行的情况。
DolphinDB
提供getStreamingStat
函数,可以全方位监控流数据处理过程。该函数返回的是一个dictionary
,包含pubConns
,subConns
,persistWorkers
,subWorkers
四个表。 -
pubConns表
pubConns
表监控本地发布节点和它的所有订阅节点之间的连接状态。每一行表示本地发布节点的一个订阅节点。它包含以下列:列名称 说明 client 订阅节点的IP和端口信息 queueDepthLimit 发布节点消息队列允许的最大深度(消息数)。每个发布节点只有一个发布消息队列。 queueDepth 发布节点消息队列深度(消息数) tables 该节点上的所有共享的流数据表。若多表,彼此通过逗号分隔。 # GUI界面中运行 getStreamingStat().pubConns
-
subConns表
subConns
表监控本地订阅节点与其订阅的发布节点之间的连接状态。每个订阅的发布节点为表中一行。列名称 说明 publisher 发布节点别名 cumMsgCount 累计接收消息数 cumMsgLatency 累计接收消息的平均延迟时间(毫秒)。延迟时间指的是消息从进入发布队列到进入订阅队列的耗时。 lastMsgLatency 最后一次接收数据延迟时间(毫秒) lastUpdate 最后一次接收数据时刻 # GUI界面中运行 getStreamingStat().subConns
-
persistWorkers表
persistWorkers
表监控流数据表持久化工作线程,每个工作线程为一行。列名称 说明 workerId 工作线程编号 queueDepthLimit 持久化消息队列深度限制 queueDepth 持久化消息队列深度 tables 持久化表名。若多表,彼此通过逗号分隔。 只有持久化启用后,才能通过
getStreamingStat
获取persistWorkers
表。这张表的记录数等于persistenceWorkerNum
配置值。 -
subWorkers表
subWorkers
表监控流数据订阅工作线程,每条记录代表一个订阅工作线程。列名称 说明 workerId 工作线程编号 queueDepthLimit 订阅消息队列最大限制 queueDepth 订阅消息队列深度 processedMsgCount handler已处理的消息数量 failedMsgCount handler处理异常的消息数量 lastErrMsg 上次handler处理异常的信息 topics 已订阅主题。若多个,彼此通过逗号分隔。 配置项
subExecutors
与subExecutorPooling
这两个配置项的对流数据处理的影响,在这张表上可以得到充分的展现。# 在GUI界面中运行 getStreamingStat().subWorkers
-
pubTable表
pubTables表监控流数据表被订阅情况,每条记录代表流数据表一个订阅连接。
列名称 说明 tableName 发布表名称 subscriber 订阅方的host和port msgOffset 订阅线程当前订阅消息的offset actions 订阅的action。若有多个action,此处用逗号分割 # 在GUI界面中运行 getStreamingStat().pubTables
-
性能调优与可视化
这部分作为高阶,后续再说。
上一篇: 前端即时通讯
下一篇: DolphinDB使用案例9:批处理依赖