欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DolphinDB使用案例17:历史数据回放replay详解

程序员文章站 2022-03-21 09:06:05
...
  • 实现方式

    本文的回放,是在DolphinDB的GUI界面种操作实现,关于在Python中的实现参见《DolphinDB使用案例18:Python API实现历史数据回放》

  • 回放的价值

    把指定表或数据源的数据以一定的速率注入到流数据表中,这个过程称为数据回放。

    对于量化交易,实盘是事件驱动框架,回测一般是逐行遍历,是两套不同的代码。

    回放,就是将历史数据按原始状态复现,回测&实盘代码可以合二为一。

    DolphinDB提供回放功能,通过replay函数和replayDS函数。

  • replay函数详解

    replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])
    
    参数 意义 备注
    input Tables 一个表或replayDS函数生成的数据源 如果有多个表或数据源,用元组表示
    outputTables 表:共享的流数据表对象;
    字符串:共享流数据表的名称;
    如果有多个输出表,用向量或元组表示,并且output Tables的结构必须与inputTables相同
    dateColumn 一个字符串标量或与inputTables长度相同的字符串向量 如果inputTables的日期列相同,dateColumn可以使用标量表示;
    如果inputTables的日期列不同,dateColumn使用向量表示;
    每个元素表示一个inputTable的日期列
    timeColumn 一个字符串标量或与inputTables长度相同的字符串向量 如果inputTables的日期列相同,timeColumn可以使用标量表示;
    如果inputTables的时间列不同,timeColumn使用向量表示;
    每个元素表示一个inputTable的日期列
    replayRate 表示每秒回放的数据条数,是一个非负整数 NULL或负数,表示以最快的速率回放
    parallelLevel 表示从数据源加载数据导内存时的线程数,是一个正整数 默认1;
    如果inputTables时包含多个表对象的元组,有效的parallelLevel为1

    replay函数的作用时将若干表或数据源同时回放到相应的输出表中。用户需要指定输入的数据表或数据源、输出表、日期列、时间列、回放速度以及并行度。

    使用cancelJobcancelConsoleJob命令可以终止回放。

    inputTables的每个dateColumn(或timeColumn)可以时相同的数据类型,也可以是不同的数据类型,单必须符合以下两种情况之一:

    • 都不包含日期信息,即数据类型可以是SECONDTIMENANOTIME
    • 都包含日期信息,即数据类型可以是DATETIMETIMESTAMPNANOTIMESTAMP
    >> sym = symbol(`IBM`APPL`MSFT`GOOG`GS)
    >> date=2012.06.12 2012.06.14 2012.06.13 2012.06.17 2012.06.26
    >> time=13:30:10.008 13:30:10.006 13:30:10.009 13:30:10.010 13:30:10.015
    >> price = rand(100.0,5)
    >> trades=table(sym,date,time,price)
    >>share streamTable(100:0,`sym`date`time`price,[SYMBOL,DATE,TIME,DOUBLE]) as st
    
    >> replay(trades, st, `date, `time)
    >> select * from st
    
    sym		date		time			price
    IBM		2012.06.12	13:30:10.008	39.0756
    APPL	2012.06.14	13:30:10.006	45.4418
    MSFT	2012.06.13	13:30:10.009	86.8915
    GOOG	2012.06.17	13:30:10.010	39.7776
    GS		2012.06.26	13:30:10.015	7.3941
    
  • replayDS函数详解

    replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
    
    参数 意义 备注
    sqlObj SQL元代码,表示回放的数据 比如<select * from sourceTable>
    dateColumn 表示日期列,是一个字符串 不指定:默认第一列为日期列;
    replayDS函数默认日期列时数据源的一个分区列,并根据分区信息将原始SQL查询拆分成多个查询
    timeColumn 表示时间列,是一个字符串 配合timeRepatitionSchema使用
    timeRepartitionSchema 时间类型向量,是一个TIME或NANOTIME类型的向量 比如08:00:00…18:00:00
    若同时指定了timeColumn,则对SQL查询在时间维度上进一步拆分

    replayDS函数基于dateColumntimeColumn这两列来划分数据源。

    生成一组数据源。replayDS函数生成的数据源可以作为replay函数的输入。

    n=5000000
    sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
    date=take(2012.06.12..2012.06.16,n)
    time=rand(13:30:11.008..13:30:11.012,n)
    price = rand(100.0,n)
    t=table(sym,date,time,price)
    if(existsDatabase("dfs://test_stock")){
           dropDatabase("dfs://test_stock")
           }
    db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
    trades=db.createPartitionedTable(t,`trades,`date)
    trades.append!(t)
    replayDS(<select * from trades where date=2012.06.12>,`date,`time,[13:30:11.008,13:30:11.010,13:30:11.013])
    

    DolphinDB使用案例17:历史数据回放replay详解

  • 回放使用场景

  • 使用场景一:单个内存表回放

    当内存表回放只需要设置输入表、输出表、日期列、回访速度即可。

    replay(inputTable, outputTable, `date, `time, 10)
    
  • 使用场景二:使用data source的单表回放

    当单表行数过多时,可以配合使用replayDS进行回放。首先使用replayDS生成data source,本例中指定了日期列和timeRepartitionColumn。回放调用与单个内存表回放相似,但是可以指定回放的并行度。replay内部实现使用了pipeline框架,取数据和输出分开执行。当输入为data source时,多块数据可以并行读取,以避免输出线程等待的情况。此例中并行度设置为2,表示有两个线程同时执行取数据的操作。

    inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
    replay(inputDS, outputTable, `date, `time, 1000, 2)
    
  • 使用场景三:使用date source的多表回放

    replay也支持多张表的同时回放,只需要将多张输入表以元组的方式传给replay,并且分别指定输出表即可。这里输出表和输入表应该一一对应,每一对都必须有相同的表结构。如果指定了日期列或时间列,那么所有表中都应当有存在相应的列。

    ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
    ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
    ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
    replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2)
    
  • 取消回放

    1. cancelJob(jobId)

      如果replay函数是通过submitJob调用,可以使用getRecentJob获取jobId,然后用cancelJob取消回放。

      getRecentJobs()
      cancelJob(jobid)
      
    2. cancelConsoleJob(jobId)

      如果是直接调用,可在另外一个GUI session中使用getConsoleJobs获取jobId,然后使用cancelConsoleJob取消回放任务。

      getConsoleJobs()
      cancelConsoleJob(jobId)
      
  • 回放使用方式

    回放的数据以流数据形式存在,我们可以使用以下三种方式来订阅消费这些数据:

    1. DolphinDB中订阅,使用DolphinDB脚本自定义回调函数来消费流数据;
    2. 在DolphinDB中订阅,使用内置的流计算引擎来处理流数据,譬如时间序列聚合引擎、横截面聚合引擎、异常检测引擎等。DolphinDB内置的聚合引擎可以对流数据进行实时聚合计算;
    3. 第三方客户端通过DolphinDB的流数据API来订阅和消费数据;
  • 使用GUI回放huibi交易所一天的历史数据

  • 数据准备

    需要用到tick数据和orderBook数据,分别存放在D:\DolphinDB\Replay\tickD:\DolphinDB\Replay\orderBook文件夹下。

    tick数据:

    DolphinDB使用案例17:历史数据回放replay详解

    orderBook数据:

    DolphinDB使用案例17:历史数据回放replay详解

  • 创建需要的DolphinDB数据库

    可根据交易标的代码、业务时间进行组合分区。创建名为dfs://huobiDB的分布式分区数据库。后续会用到web可视化文件replay.html,如果需要修改dfs://huobiDB这个数据库名,则replay.html也需要同步修改。

    # 创建数据库dfs://huobiDB
    def createDB(){
    	if(existsDatabase("dfs://huobiDB"))
    		dropDatabase("dfs://huobiDB")
        //按照数据集的时间跨度,请自行调整VALUE分区日期范围
    	db1 = database(, VALUE, 2018.09.01..2018.09.30)
    	db2 = database(, HASH, [SYMBOL,20])
    	db = database("dfs://huobiDB", COMPO, [db1,db2])
    }
    # 在dfs://huobiDB数据库下创建tick分区表
    def createTick(){
    	tick = table(100:0, `aggregate_ID`server_time`price`amount`buy_or_sell`first_trade_ID`last_trade_ID`product , [INT,TIMESTAMP,DOUBLE,DOUBLE,CHAR,INT,INT,SYMBOL])
    	db = database("dfs://huobiDB")
    	return db.createPartitionedTable(tick, `tick, `server_time`product)
    }
    # 在dfs://huobiDB数据库下创建orderBook分区表
    def createOrderBook(){
    	orderData = table(100:0, `lastUpdateId`server_time`buy_1_price`buy_2_price`buy_3_price`buy_4_price`buy_5_price`buy_6_price`buy_7_price`buy_8_price`buy_9_price`buy_10_price`buy_11_price`buy_12_price`buy_13_price`buy_14_price`buy_15_price`buy_16_price`buy_17_price`buy_18_price`buy_19_price`buy_20_price`sell_1_price`sell_2_price`sell_3_price`sell_4_price`sell_5_price`sell_6_price`sell_7_price`sell_8_price`sell_9_price`sell_10_price`sell_11_price`sell_12_price`sell_13_price`sell_14_price`sell_15_price`sell_16_price`sell_17_price`sell_18_price`sell_19_price`sell_20_price`buy_1_amount`buy_2_amount`buy_3_amount`buy_4_amount`buy_5_amount`buy_6_amount`buy_7_amount`buy_8_amount`buy_9_amount`buy_10_amount`buy_11_amount`buy_12_amount`buy_13_amount`buy_14_amount`buy_15_amount`buy_16_amount`buy_17_amount`buy_18_amount`buy_19_amount`buy_20_amount`sell_1_amount`sell_2_amount`sell_3_amount`sell_4_amount`sell_5_amount`sell_6_amount`sell_7_amount`sell_8_amount`sell_9_amount`sell_10_amount`sell_11_amount`sell_12_amount`sell_13_amount`sell_14_amount`sell_15_amount`sell_16_amount`sell_17_amount`sell_18_amount`sell_19_amount`sell_20_amount`product,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL])
    	db = database("dfs://huobiDB")
    	return db.createPartitionedTable(orderData, `orderBook, `server_time`product)
    }
    
  • 创建文本数据导入函数

    创建将数据导入到数据库的函数。

    # tick数据加载函数
    def loadTick(path, filename, mutable tb){
    	tmp = filename.split("_")
    	product = tmp[1]
    	file = path + "/" + filename
    	t = loadText(file)
    	t[`product]=product
    	tb.append!(t)
    }
    # 遍历加载文件夹下的所有tick数据
    def loopLoadTick(mutable tb, path){
    	fileList = exec filename from files(path,"%.csv")
    	for(filename in fileList){
    		print filename
    		loadTick(path, filename, tb)
    	}
    }
    
    # orderBook数据加载函数
    def loadOrderBook(path, filename, mutable tb){
    	tmp = filename.split("_")
    	product = tmp[1]
    	file = path + "/" + filename
    	t = loadText(file)
    	t[`product] = product
    	tb.append!(t)
    }
    # 遍历加载文件夹下的orderBook数据文件
    def loopLoadOrderBook(mutable tb, path){
    	fileList = exec filename from files(path, "%.csv")
    	for(filename in fileList){
    		print filename
    		loadOrderBook(path, filename, tb)
    	}
    }
    
  • 执行创建、加载操作

    上述工作,已经准备好了数据文件csv创建数据库的函数创建分区表的函数将csv数据载入数据库的函数,现在就依此执行上述函数,实现数据库创建并载入函数。

    # 执行创建并载入的动作
    # 登录
    login("admin", "123456")
    # 创建dfs://huobiDB数据库
    createDB()
    # 创建dfs://huobiDB数据库下的分区表
    to = createOrderBook() 
    # 加载经处理的orderBook数据到数据库
    loopLoadOrderBook(to, "D:/DolphinDB/Replay/orderBook")
    # 创建dfs://huobiDB数据库下的分区表
    tt = createTick()
    # 加载经处理的tick数据到数据库
    loopLoadTick(tt, "D:/DolphinDB/Replay/tick")
    
  • 定义数据回放函数

    //定义数据回放函数
    def replayData(productCode, startTime, length, rate){
        login('admin', '123456');
        tick = loadTable('dfs://huobiDB', 'tick');
        orderbook = loadTable('dfs://huobiDB', 'orderBook');
    
        schTick = select name,typeString as type from  tick.schema().colDefs;
        schOrderBook = select name,typeString as type from  orderbook.schema().colDefs;
    	
        share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder);
        share(streamTable(100:0, schTick.name, schTick.type), `outTick);
        enableTablePersistence(objByName(`outOrder), true,true, 100000);
        enableTablePersistence(objByName(`outTick), true,true, 100000);
        clearTablePersistence(objByName(`outOrder));
        clearTablePersistence(objByName(`outTick));
                                                    
        share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder);
        share(streamTable(100:0, schTick.name, schTick.type), `outTick);
        enableTablePersistence(objByName(`outOrder), true,true, 100000);
        enableTablePersistence(objByName(`outTick), true,true, 100000);
    
        endTime = temporalAdd(startTime, length, "m")
        sqlTick = sql(sqlCol("*"), tick,  [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
        sqlOrder = sql(sqlCol("*"), orderbook,  [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
        cutCount = length * 60 / 20
        trs = cutPoints(timestamp(startTime..endTime), cutCount);
        rds = replayDS(sqlTick, `server_time , , trs);
        rds2 = replayDS(sqlOrder, `server_time , , trs);
        return submitJob('replay_huobi','replay_huobi',  replay,  [rds,rds2],  [`outTick,`outOrder],`server_time ,, rate);
    }
    
    addFunctionView(replayData);
    
  • 网页查看回放可视化

    下载回放界面的html压缩包,将replay.zip解压到DolphinDB程序包的web目录。

    在浏览器地址栏中输入http://[host]:[port]/replay.html打开数据回放界面。这里的hostport是指数据节点的IP地址和端口号,如http://192.168.1.135:8921/replay.html

    如果是集群配置,port采用数据节点。

    DolphinDB使用案例17:历史数据回放replay详解

  • References

  1. DolphinDB历史数据回放教程