DolphinDB使用案例17:历史数据回放replay详解
-
实现方式
本文的回放,是在DolphinDB的GUI界面种操作实现,关于在Python中的实现参见《DolphinDB使用案例18:Python API实现历史数据回放》
-
回放的价值
把指定表或数据源的数据以一定的速率注入到流数据表中,这个过程称为数据回放。
对于量化交易,实盘是事件驱动框架,回测一般是逐行遍历,是两套不同的代码。
回放,就是将历史数据按原始状态复现,回测&实盘代码可以合二为一。
DolphinDB
提供回放功能,通过replay
函数和replayDS
函数。 -
replay函数详解
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])
参数 意义 备注 input Tables 一个表或replayDS函数生成的数据源 如果有多个表或数据源,用元组表示 outputTables 表:共享的流数据表对象;
字符串:共享流数据表的名称;
如果有多个输出表,用向量或元组表示,并且output Tables的结构必须与inputTables相同 dateColumn 一个字符串标量或与inputTables长度相同的字符串向量 如果inputTables的日期列相同,dateColumn可以使用标量表示;
如果inputTables的日期列不同,dateColumn使用向量表示;
每个元素表示一个inputTable的日期列timeColumn 一个字符串标量或与inputTables长度相同的字符串向量 如果inputTables的日期列相同,timeColumn可以使用标量表示;
如果inputTables的时间列不同,timeColumn使用向量表示;
每个元素表示一个inputTable的日期列replayRate 表示每秒回放的数据条数,是一个非负整数 NULL或负数,表示以最快的速率回放 parallelLevel 表示从数据源加载数据导内存时的线程数,是一个正整数 默认1;
如果inputTables时包含多个表对象的元组,有效的parallelLevel为1replay
函数的作用时将若干表或数据源同时回放到相应的输出表中。用户需要指定输入的数据表或数据源、输出表、日期列、时间列、回放速度以及并行度。使用cancelJob和cancelConsoleJob命令可以终止回放。
inputTables
的每个dateColumn
(或timeColumn
)可以时相同的数据类型,也可以是不同的数据类型,单必须符合以下两种情况之一:- 都不包含日期信息,即数据类型可以是
SECOND
、TIME
、NANOTIME
- 都包含日期信息,即数据类型可以是
DATETIME
、TIMESTAMP
、NANOTIMESTAMP
>> sym = symbol(`IBM`APPL`MSFT`GOOG`GS) >> date=2012.06.12 2012.06.14 2012.06.13 2012.06.17 2012.06.26 >> time=13:30:10.008 13:30:10.006 13:30:10.009 13:30:10.010 13:30:10.015 >> price = rand(100.0,5) >> trades=table(sym,date,time,price) >>share streamTable(100:0,`sym`date`time`price,[SYMBOL,DATE,TIME,DOUBLE]) as st >> replay(trades, st, `date, `time) >> select * from st sym date time price IBM 2012.06.12 13:30:10.008 39.0756 APPL 2012.06.14 13:30:10.006 45.4418 MSFT 2012.06.13 13:30:10.009 86.8915 GOOG 2012.06.17 13:30:10.010 39.7776 GS 2012.06.26 13:30:10.015 7.3941
- 都不包含日期信息,即数据类型可以是
-
replayDS函数详解
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
参数 意义 备注 sqlObj SQL元代码,表示回放的数据 比如<select * from sourceTable> dateColumn 表示日期列,是一个字符串 不指定:默认第一列为日期列;
replayDS函数默认日期列时数据源的一个分区列,并根据分区信息将原始SQL查询拆分成多个查询timeColumn 表示时间列,是一个字符串 配合timeRepatitionSchema使用 timeRepartitionSchema 时间类型向量,是一个TIME或NANOTIME类型的向量 比如08:00:00…18:00:00
若同时指定了timeColumn,则对SQL查询在时间维度上进一步拆分replayDS
函数基于dateColumn
和timeColumn
这两列来划分数据源。生成一组数据源。
replayDS
函数生成的数据源可以作为replay
函数的输入。n=5000000 sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n) date=take(2012.06.12..2012.06.16,n) time=rand(13:30:11.008..13:30:11.012,n) price = rand(100.0,n) t=table(sym,date,time,price) if(existsDatabase("dfs://test_stock")){ dropDatabase("dfs://test_stock") } db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16) trades=db.createPartitionedTable(t,`trades,`date) trades.append!(t) replayDS(<select * from trades where date=2012.06.12>,`date,`time,[13:30:11.008,13:30:11.010,13:30:11.013])
-
回放使用场景
-
使用场景一:单个内存表回放
当内存表回放只需要设置输入表、输出表、日期列、回访速度即可。
replay(inputTable, outputTable, `date, `time, 10)
-
使用场景二:使用data source的单表回放
当单表行数过多时,可以配合使用
replayDS
进行回放。首先使用replayDS
生成data source
,本例中指定了日期列和timeRepartitionColumn
。回放调用与单个内存表回放相似,但是可以指定回放的并行度。replay
内部实现使用了pipeline框架,取数据和输出分开执行。当输入为data source
时,多块数据可以并行读取,以避免输出线程等待的情况。此例中并行度设置为2,表示有两个线程同时执行取数据的操作。inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay(inputDS, outputTable, `date, `time, 1000, 2)
-
使用场景三:使用date source的多表回放
replay
也支持多张表的同时回放,只需要将多张输入表以元组的方式传给replay
,并且分别指定输出表即可。这里输出表和输入表应该一一对应,每一对都必须有相同的表结构。如果指定了日期列或时间列,那么所有表中都应当有存在相应的列。ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000) ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000) replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2)
-
取消回放
-
回放使用方式
回放的数据以流数据形式存在,我们可以使用以下三种方式来订阅消费这些数据:
-
在
DolphinDB
中订阅,使用DolphinDB
脚本自定义回调函数来消费流数据; -
在DolphinDB中订阅,使用内置的流计算引擎来处理流数据,譬如时间序列聚合引擎、横截面聚合引擎、异常检测引擎等。
DolphinDB
内置的聚合引擎可以对流数据进行实时聚合计算; -
第三方客户端通过DolphinDB的流数据API来订阅和消费数据;
-
-
使用GUI回放huibi交易所一天的历史数据
-
数据准备
需要用到
tick
数据和orderBook
数据,分别存放在D:\DolphinDB\Replay\tick
和D:\DolphinDB\Replay\orderBook
文件夹下。tick
数据:orderBook
数据: -
创建需要的DolphinDB数据库
可根据交易标的代码、业务时间进行组合分区。创建名为
dfs://huobiDB
的分布式分区数据库。后续会用到web可视化文件replay.html
,如果需要修改dfs://huobiDB
这个数据库名,则replay.html
也需要同步修改。# 创建数据库dfs://huobiDB def createDB(){ if(existsDatabase("dfs://huobiDB")) dropDatabase("dfs://huobiDB") //按照数据集的时间跨度,请自行调整VALUE分区日期范围 db1 = database(, VALUE, 2018.09.01..2018.09.30) db2 = database(, HASH, [SYMBOL,20]) db = database("dfs://huobiDB", COMPO, [db1,db2]) } # 在dfs://huobiDB数据库下创建tick分区表 def createTick(){ tick = table(100:0, `aggregate_ID`server_time`price`amount`buy_or_sell`first_trade_ID`last_trade_ID`product , [INT,TIMESTAMP,DOUBLE,DOUBLE,CHAR,INT,INT,SYMBOL]) db = database("dfs://huobiDB") return db.createPartitionedTable(tick, `tick, `server_time`product) } # 在dfs://huobiDB数据库下创建orderBook分区表 def createOrderBook(){ orderData = table(100:0, `lastUpdateId`server_time`buy_1_price`buy_2_price`buy_3_price`buy_4_price`buy_5_price`buy_6_price`buy_7_price`buy_8_price`buy_9_price`buy_10_price`buy_11_price`buy_12_price`buy_13_price`buy_14_price`buy_15_price`buy_16_price`buy_17_price`buy_18_price`buy_19_price`buy_20_price`sell_1_price`sell_2_price`sell_3_price`sell_4_price`sell_5_price`sell_6_price`sell_7_price`sell_8_price`sell_9_price`sell_10_price`sell_11_price`sell_12_price`sell_13_price`sell_14_price`sell_15_price`sell_16_price`sell_17_price`sell_18_price`sell_19_price`sell_20_price`buy_1_amount`buy_2_amount`buy_3_amount`buy_4_amount`buy_5_amount`buy_6_amount`buy_7_amount`buy_8_amount`buy_9_amount`buy_10_amount`buy_11_amount`buy_12_amount`buy_13_amount`buy_14_amount`buy_15_amount`buy_16_amount`buy_17_amount`buy_18_amount`buy_19_amount`buy_20_amount`sell_1_amount`sell_2_amount`sell_3_amount`sell_4_amount`sell_5_amount`sell_6_amount`sell_7_amount`sell_8_amount`sell_9_amount`sell_10_amount`sell_11_amount`sell_12_amount`sell_13_amount`sell_14_amount`sell_15_amount`sell_16_amount`sell_17_amount`sell_18_amount`sell_19_amount`sell_20_amount`product,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL]) db = database("dfs://huobiDB") return db.createPartitionedTable(orderData, `orderBook, `server_time`product) }
-
创建文本数据导入函数
创建将数据导入到数据库的函数。
# tick数据加载函数 def loadTick(path, filename, mutable tb){ tmp = filename.split("_") product = tmp[1] file = path + "/" + filename t = loadText(file) t[`product]=product tb.append!(t) } # 遍历加载文件夹下的所有tick数据 def loopLoadTick(mutable tb, path){ fileList = exec filename from files(path,"%.csv") for(filename in fileList){ print filename loadTick(path, filename, tb) } } # orderBook数据加载函数 def loadOrderBook(path, filename, mutable tb){ tmp = filename.split("_") product = tmp[1] file = path + "/" + filename t = loadText(file) t[`product] = product tb.append!(t) } # 遍历加载文件夹下的orderBook数据文件 def loopLoadOrderBook(mutable tb, path){ fileList = exec filename from files(path, "%.csv") for(filename in fileList){ print filename loadOrderBook(path, filename, tb) } }
-
执行创建、加载操作
上述工作,已经准备好了
数据文件csv
、创建数据库的函数
、创建分区表的函数
、将csv数据载入数据库的函数
,现在就依此执行上述函数,实现数据库创建并载入函数。# 执行创建并载入的动作 # 登录 login("admin", "123456") # 创建dfs://huobiDB数据库 createDB() # 创建dfs://huobiDB数据库下的分区表 to = createOrderBook() # 加载经处理的orderBook数据到数据库 loopLoadOrderBook(to, "D:/DolphinDB/Replay/orderBook") # 创建dfs://huobiDB数据库下的分区表 tt = createTick() # 加载经处理的tick数据到数据库 loopLoadTick(tt, "D:/DolphinDB/Replay/tick")
-
定义数据回放函数
//定义数据回放函数 def replayData(productCode, startTime, length, rate){ login('admin', '123456'); tick = loadTable('dfs://huobiDB', 'tick'); orderbook = loadTable('dfs://huobiDB', 'orderBook'); schTick = select name,typeString as type from tick.schema().colDefs; schOrderBook = select name,typeString as type from orderbook.schema().colDefs; share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder); share(streamTable(100:0, schTick.name, schTick.type), `outTick); enableTablePersistence(objByName(`outOrder), true,true, 100000); enableTablePersistence(objByName(`outTick), true,true, 100000); clearTablePersistence(objByName(`outOrder)); clearTablePersistence(objByName(`outTick)); share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder); share(streamTable(100:0, schTick.name, schTick.type), `outTick); enableTablePersistence(objByName(`outOrder), true,true, 100000); enableTablePersistence(objByName(`outTick), true,true, 100000); endTime = temporalAdd(startTime, length, "m") sqlTick = sql(sqlCol("*"), tick, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]); sqlOrder = sql(sqlCol("*"), orderbook, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]); cutCount = length * 60 / 20 trs = cutPoints(timestamp(startTime..endTime), cutCount); rds = replayDS(sqlTick, `server_time , , trs); rds2 = replayDS(sqlOrder, `server_time , , trs); return submitJob('replay_huobi','replay_huobi', replay, [rds,rds2], [`outTick,`outOrder],`server_time ,, rate); } addFunctionView(replayData);
-
网页查看回放可视化
下载回放界面的html压缩包,将
replay.zip
解压到DolphinDB
程序包的web
目录。在浏览器地址栏中输入
http://[host]:[port]/replay.html
打开数据回放界面。这里的host
和port
是指数据节点的IP地址和端口号,如http://192.168.1.135:8921/replay.html
。如果是集群配置,
port
采用数据节点。 -
References
上一篇: Unity制作批量配音制作工具