欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DolphinDB使用案例16:Python实现流数据订阅

程序员文章站 2022-07-13 14:56:07
...
  • 流数据对量化交易的意义

    做量化,首要目标是回测结果好。常规情况下,回测代码与实盘代码是不同的,因为获取数据的方式不同。

    实盘交易策略,目前一般是事件驱动型,就是来一个事件(比如:得到一个新的K线),程序做出一次响应;而回测代码一般是逐行遍历历史数据。两者略有不同,实盘时候使用的数据就是一种流数据。

    DolphinDB提供流数据订阅,也就是实现了一个小型模拟交易所,为回测提供了仿真环境,这也是我用DolphinDB的初衷。

    关于DolphinDB的流处理,参见《理解DolphinDB流数据处理框架

  • Python API实现流处理

  • 配置DolphinDB

    对于单节点,配置server下dolphindb.cfg文件:

    # Streaming 
    # pub
    maxPubConnections=1
    persistenceDir=D:\DolphinDB\DolphinDBFlow
    # sub
    subPort=8888
    

    其他参数使用默认就可以。

    具体参见手册中第10章>>参数配置>>单实例配置

  • 创建DolphinDB连接

    import dolphindb as ddb
    s = ddb.session()
    
  • 指定订阅端口8888

    s.enableStreaming(8888) # 原则上这样是正确的,当前版本099会出错,通过下述代码可行
    # >>>>>>>>>>>>>>>>
    import dolphindb as ddb
    from threading import Event
    # 创建连接
    s = ddb.session()
    s.enableStreaming(28003)
    
    def handler(lst):
        print(lst)
    
    s.subscribe("127.0.0.1", 8921, handler, "st", actionName= "action1", offset=0)
    Event().wait()
    
  • 创建发布表(在DolphinDB GUI创建)

    发布表必须是可共享的流数据表,一共分三步实现:

    1. 创建流数据表
    2. 共享流数据表
    3. 向流数据表中写入数据实现发布

    创建共享的流数据表,指定进行过滤的列,并插入一些随机数据:

    # 创建共享的流数据表,此段代码在DolphinDB的GUI中执行
    share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
    setStreamTableFilterColumn(trades, `sym)
    insert into trades values(take(now(), 10), rand(`ab`cd`ef`gh`ij, 10), rand(1000,10)/10.0, 1..10)
    

    DolphinDB使用案例16:Python实现流数据订阅

  • 订阅

    DolphinDB 服务器上使用函数subscribeTable()实现订阅,在Python API中,使用subscribe()函数实现同样功能。

    s.subscribe(host, port, handler, tableName, actionName="", offset=-1, resub=False, filter=None)
    
    参数 意义 备注
    host 发布端节点的ip地址
    port 发布端节点的端口号
    handler 用户自定义的回调函数,用于处理每次流入的数据
    tableName 发布表的名称
    actionName 订阅任务的名称
    offset 表示订阅任务开始后的第一条消息所在的位置,是一个整数 消息是流数据表中的行。如果没有指定offset,或它为负数或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。
    resub 表示订阅中断后,是否会自动重订阅,是一个布尔值
    filter 表示过滤条件,是一个向量 流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。

    关于其更详细的内容解释,参见《理解DolphinDB流数据处理框架

  • 取消订阅

    s.unsubscribe(host,port,tableName,actionName="")
    

    因为订阅是异步执行的,所以订阅完成后需要保持主线程不退出,比如:

    from threading import Event     # 加在第一行
    Event().wait()                  # 加在最后一行
    

    否则订阅线程会在主线程退出前立刻终止,导致无法收到订阅消息。

  • References

  1. DolphinDB Python API (全部Python内容,包括流数据)
  2. DolphinDB流数据教程(GUI)