DolphinDB使用案例16:Python实现流数据订阅
程序员文章站
2022-07-13 14:56:07
...
-
流数据对量化交易的意义
做量化,首要目标是回测结果好。常规情况下,回测代码与实盘代码是不同的,因为获取数据的方式不同。
实盘交易策略,目前一般是事件驱动型,就是来一个事件(比如:得到一个新的K线),程序做出一次响应;而回测代码一般是逐行遍历历史数据。两者略有不同,实盘时候使用的数据就是一种流数据。
DolphinDB
提供流数据订阅,也就是实现了一个小型模拟交易所,为回测提供了仿真环境,这也是我用DolphinDB
的初衷。关于
DolphinDB
的流处理,参见《理解DolphinDB流数据处理框架》 -
Python API实现流处理
-
配置DolphinDB
对于单节点,配置server下dolphindb.cfg文件:
# Streaming # pub maxPubConnections=1 persistenceDir=D:\DolphinDB\DolphinDBFlow # sub subPort=8888
其他参数使用默认就可以。
具体参见手册中第10章>>参数配置>>单实例配置。
-
创建DolphinDB连接
import dolphindb as ddb s = ddb.session()
-
指定订阅端口8888
s.enableStreaming(8888) # 原则上这样是正确的,当前版本099会出错,通过下述代码可行 # >>>>>>>>>>>>>>>> import dolphindb as ddb from threading import Event # 创建连接 s = ddb.session() s.enableStreaming(28003) def handler(lst): print(lst) s.subscribe("127.0.0.1", 8921, handler, "st", actionName= "action1", offset=0) Event().wait()
-
创建发布表(在DolphinDB GUI创建)
发布表必须是可共享的流数据表,一共分三步实现:
创建共享的流数据表,指定进行过滤的列,并插入一些随机数据:
# 创建共享的流数据表,此段代码在DolphinDB的GUI中执行 share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades setStreamTableFilterColumn(trades, `sym) insert into trades values(take(now(), 10), rand(`ab`cd`ef`gh`ij, 10), rand(1000,10)/10.0, 1..10)
-
订阅
在
DolphinDB
服务器上使用函数subscribeTable()
实现订阅,在Python API
中,使用subscribe()
函数实现同样功能。s.subscribe(host, port, handler, tableName, actionName="", offset=-1, resub=False, filter=None)
参数 意义 备注 host 发布端节点的ip地址 port 发布端节点的端口号 handler 用户自定义的回调函数,用于处理每次流入的数据 tableName 发布表的名称 actionName 订阅任务的名称 offset 表示订阅任务开始后的第一条消息所在的位置,是一个整数 消息是流数据表中的行。如果没有指定offset,或它为负数或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。 resub 表示订阅中断后,是否会自动重订阅,是一个布尔值 filter 表示过滤条件,是一个向量 流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。 关于其更详细的内容解释,参见《理解DolphinDB流数据处理框架》
-
取消订阅
s.unsubscribe(host,port,tableName,actionName="")
因为订阅是异步执行的,所以订阅完成后需要保持主线程不退出,比如:
from threading import Event # 加在第一行 Event().wait() # 加在最后一行
否则订阅线程会在主线程退出前立刻终止,导致无法收到订阅消息。
-
References
上一篇: View的测量、布局及绘制过程
下一篇: Java 流的高级使用之收集数据