欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DolphinDB使用案例12:Python API接口

程序员文章站 2022-07-13 14:55:55
...
  • DolphinDB与Python

    DolphinDB可以在Python中调用,大大降低了时序数据库的使用门槛。

    DolphinDB Python API实质是封装了DolphinDB的脚本语言1(也就是前面11次案例中使用到的语言)。Python代码被转换成DolphinDB脚本在DolphinDB服务器执行,执行结果保存到DolphinDB服务器或者序列化到Python客户端。

  • Python API中方法分为两类

  • 不触发脚本的执行

    只是将Python语言转换成DolphinDB脚本,但并不在DolphinDB服务器上执行。

  • 触发脚本的执行

    即将Python语言转成DolphinDB脚本语言,又在DolphinDB服务器上运行。

    触发脚本执行的方法,都是table类的方法。如下:

    方法名 详情
    connect(host, port, [username, password]) 将会话连接到DolphinDB服务器。
    toDF() 把DolphinDB表对象转换成pandas的Dataframe对象。
    executeAs(tableName) 执行结果保存为指定表名的内存表。
    execute() 执行脚本。与updatedelete一起使用。
    database(dbPath, …) 创建或加载数据库。
    dropDatabase(dbPath) 删除数据库。
    dropPartition(dbPath, partitionPaths) 删除数据库的某个分区。
    dropTable(dbPath, tableName) 删除数据库中的表。
    drop(colNameList) 删除表中的某列。
    ols(Y, X, intercept) 计算普通最小二乘回归,返回结果是一个字典。
  • 安装Python API模块

    pip install dolphindb
    # 如果已安装,可以更新,因为当前他们开发迭代较快
    pip install dolphindb --upgrade
    # 当前版本为0.1.13.8
    
  • 连接DolphinDB数据库

  • 会话session

    Python通过会话与DolphinDB进行交互,就像MySQL与Python的engine一样。

    建立会话之前,需要把目标服务器先打开,即服务器的端口呈开放状态。

    # 创建会话,建立连接
    s = ddb.session()
    s.connect("localhost", 8920)
    # # 如需用户名和密码
    # s.connect("localhost", 8848, admin, 123456)
    

    如果连接成功,运行后不会返回信息。想来是遵循无消息就是最好的消息原则。

    DolphinDB使用案例12:Python API接口

    如果,连接失败,会返回错误码。

    DolphinDB使用案例12:Python API接口

    如果采用单节点部署,默认端口8848;如果采用单服务器集群部署,默认端口8920.

  • 重新连接

    在API使用期间,如果与服务器连接暂时中断,API会进行重新连接,并执行之前未成功运行的脚本

    重新连接会获得一个新的会话,在新会话执行之前连接未成功运行的脚本B之前,可以通过下述代码设置一个任务A,先执行完A再执行未成功运行的B。

    import dolphindb as ddb
    s = ddb.session()
    s.setInitScript("initTable = streamTable(10000:0, `id`val, [INT,LONG])")
    currentInitScript = s.getInitScript()
    
  • 将数据导入DolphinDB服务器

    DolphinDB数据库根据存储方式可以分成3种类型:

    1. 内存数据库
    2. 本地文件系统数据库
    3. 分布式文件系统(DFS)的数据库

    关于三种类型的基本概念,参见《内存数据库、磁盘数据库、分布式数据库区别》《数据库分区、分表、分库、分片》。

    本文先以本地文件系统的数据库为例。

  • 把数据存入内存表

    此处使用example.csv数据,此数据官网有提供,不过下载网速较慢,各位老板如有需要,可以留言,我发给各位。

  • loadText载入到内存表

    loadText方法把文本文件载入到DolphinDB的内存表种,并返回一个DolphinDB内存表对象。

    >> trade = s.loadText('D:/DolphinDB/Python/example.csv')
    # Windows中路径需要用反斜杠,与常规Python应用不同
    >> print(trade)
    >> print(type(trade))
    <dolphindb.table.Table object at 0x000001CED8779BA8>
    <class 'dolphindb.table.Table'>
    

    当然,这里也可以使用ploadText,速度会翻倍。

    对比载入一个840M股票数据csv文件:

    方式 时间
    pd.read_csv 01:02.877
    loadText 00:33.288
    ploadText 00:16.969

    详见《DolphinDB与pandas读取csv文件速度对比测试

  • toDF

    toDF方法把Python中的DolphinDB对象转换成pandas DataFrame对象。

    >> df = trade.toDF()
    >> print(df)
    >> print(type(df))
          TICKER       date       VOL        PRC        BID       ASK
    0       AMZN 1997-05-15   6029815   23.50000   23.50000   23.6250
    1       AMZN 1997-05-16   1232226   20.75000   20.50000   21.0000
    2       AMZN 1997-05-19    512070   20.50000   20.50000   20.6250
    ...
    13133   NFLX 2016-12-28   4388956  125.89000  125.88000  125.8900
    13134   NFLX 2016-12-29   3444729  125.33000  125.31000  125.3300
    13135   NFLX 2016-12-30   4455012  123.80000  123.80000  123.8300
    [13136 rows x 6 columns]
    <class 'pandas.core.frame.DataFrame'>
    
  • 把数据导入到分区数据库

    如果数据文件比可用内存大,可以把数据导入到分区数据库中。

  • 创建分区数据库

    分区方案:值分区;

    分区字段:example.csv文件中有3只股票,使用股票代码作为分区字段。

    s.database('db', partitionType= 'VALUE', partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb1')
    # 等同于下面这种分区方案设定方法
    s.database('db', partitionType= ddb.settings.VALUE, partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb1')
    

    要点:文件路径要用/,否则数据库创建不成功,但是并不会报错

  • 创建分布式分区数据库

    s.database('db', partitionType='VALUE', partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedbdfs")
    

    与分区数据库相比,只是存储路径dbPath不同,这里valuedbdfs在数据节点文件夹下面。

    DolphinDB使用案例12:Python API接口

  • 其他分区方式

    除了值分区(VALUE),DolphinDB还支持顺序分区(SEQ)、哈希分区(HASH)、范围分区(RANGE)、列表分区(LIST)与组合分区(COMBO)。

    概念及区别详情参见《DolphinDB使用案例2:数据表分区

  • 创建分区表,并追加数据

    创建数据库后,可使用函数loadTextEx把文本文件导入到分区数据库的分区表中。

    如果分区表不存在,函数会自动生成该分区表,并把数据追加到表中;

    如果分区表已存在,则直接把数据追加到分区表中。

    loadTextEx函数返回一个包含载入元素的DolphinDB表对象。可通过toDF()函数转成pandas.DataFrame,前文有讲。

    # 创建分区数据库
    s.database('db', partitionType= ddb.settings.VALUE, partitions=["AMZN", "NFLX", "NVDA"], dbPath='D:/DolphinDB/Python/valuedb')
    # 创建分区表,并载入数据
    trade = s.loadTextEx('db', tableName= 'trade', partitionColumns= ['TICKER'], filePath= 'D:/DolphinDB/Python/example.csv')
    # 查看表行数
    print(trade.rows)
    # 查看表列数
    print(trade.cols)
    # 查看表结构
    print(trade.schema)
    
    # 查看数据库中已经存在的表
    table = s.table(dbPath='D:/DolphinDB/Python/valuedb', data= "trade")
    

    上面这段代码,每运行一次,就会发现行数(trade.rows)增多,因为如果表在分布式数据库中存在,就会把数据追加到表上。

    要解决这个问题,存在两个方案:

    1. 数据库去重

      可以先判断是否存在重名数据库,如果存在就删除,重新创建。

      if s.existsDatabase(WORK_DIR+"/valuedb"):
          s.dropDatabase(WORK_DIR+"/valuedb")
      

      这种方式在数据库不重要的时候没问题,如果数据库是需要长期存在的,这样就太粗鲁了。

    2. 设置避免重复插入相同数据

      创建含主键的流数据表。keyedStreamTable函数。

      需要理清逻辑关系,keyedStreamTable函数是直接创建一个包含主键的表,并不能给一个已经创建好的普通表指定主键。

      这就要求使用者在使用之处就做好使用规划。

  • 把数据导入到内存的分区表中

  • loadTextEx

    先有数据库,再有表,这个逻辑是不变的。

    # 第一步:创建值分区内存数据库
    s.database('db', partitionType= ddb.settings.VALUE, partitions=['AMZN', 'NFLX', 'NVDA'], dbPath='')
    # 第二步:讲数据导入内存的分区表中
    trade = s.loadTextEx(dbPath='db', partitionColumns= ['TICKER'], tableName='trade', filePath='D:/DolphinDB/Python/example.csv')
    

    返回一个DolphinDB表

  • ploadText与loadText

    ploadText函数可以并行加载文本文件到内存分区表中。

    trade = s.ploadText('D:/DolphinDB/Python/example.csv')
    

    ploadText函数直接载入内存表,并不需要提前创建内存数据库。

    loadText函数也一样,只是比ploadText慢一点。

  • 从Python上传数据到DolphinDB服务器

  • 数据类型转换

    上传数据时,Python中的一些基础类型,如boolint64float64,会自动转换为DolphinDBBOOLINTDOUBLE类型。

    时间类型需要做特殊处理。

    DolphinDB提供DATEMONTHTIMEMINUTESECONDDATETIMETIMESTAMPNANOTIMENANOTIMESTAMP九种类型的时间类型。

    Python中时间类型均为datetime64类型,会被转换成DolphinDBNANOTIMESTAMP类型。

    Python API提供了from_time,from_date,from_datetime方法,能把datetime64类型转换成DolphinDB的各种时间类型。具体对应参见:

    DolphinDB时间类型 例子 上传到DolphinDB的结果
    DATE Date.from_date(date(2012,12,20)) 2012.12.20
    MONTH Month.from_date(date(2012,12,26)) 2012.12M
    TIME Time.from_time(time(12,30,30,8)) 12:30:30.008
    MINUTE Minute.from_time(time(12,30)) 12:30m
    SECOND Second.from_time(time(12,30,30)) 12:30:30
    DATETIME Datetime.from_datetime(datetime(2012,12,30,15,12,30)) 2012.12.30 15:12:30
    TIMESTAMP Timestamp.from_datetime(datetime(2012,12,30,15,12,30,8)) 2012.12.30 15:12:30.008
    NANOTIME NanoTime.from_time(time(13,30,10,706)) 13:30:10.000706000
    NANOTIMESTAMP NanoTimestamp.from_datetime(datetime(2012,12,24,13,30,10,80706)) 2012.12.24 13:30:10.080706000
  • 缺失值处理

    Python中的np.NaN是特殊的float数据类型,上传时,DolphinDB会把他们识别为float。

    1. ddb.overwriteType(dataframe,dict)

      此函数,可在上传DataFrame之前指定一个或多个列在dolphinDB中的数据类型。

      dataframe:Python中的DataFrame类型;

      dict:Python中字典对象,key表示dataframe中某列的名称;value表示DolphinDB的数据类型,只能取:ddb.DT_BOOL、ddb.DT_INT、ddb.DT_LONG、ddb.DT_DOUBLE。

      ddb.overwriteTypes(t,{'isBuyer':ddb.DT_BOOL})
      
    2. .null()方法构造NULL
      类型 对应的NULL
      DATE Date.null()
      MONTH Month.null()
      TIME Time.null()
      SECOND Second.null()
      DATETIME Datetime.null()
      TIMESTAMP Timestamp.null()
      NANOTIME NanoTime.null()
      NANOTIMESTAMP NanoTimestamp.null()

    上传字典或DataFrame时,同一列中不能同时包含Python的原生类型和DolphinDB Python API提供的类型。

    例如:‘date’:[date(2012,12,30), Date.from_date(date(2012,12,31)), Date.null()], date列同时包含Python的datetime64类型和DolphinDB Python API提供的DATE类型,会导致上传失败。

  • 使用upload函数上传

    upload可以把Python对象上传到DolphinDB服务器。upload函数的输入是Python的字典对象:

    {'DolphinDB中的变量名' : 'Python对象'}

    1. 上传Python list
      # 上传数据
      a = [1,2,3,4,5,6]
      s.upload({'a':a})
      # 读取数据
      a_new = s.run("a"))
      

      DolphinDB使用案例12:Python API接口

      Python中像a=[1,2,3.0]这种类型的内置list,上传到DolphinDB后,会被识别为any vector。这种情况下,建议使用np.array代替内置list,即a=np.array([1,2,3.0],dtype=np.double),这样a会被识别为double类型的向量。

    2. 上传Python DataFrame
      # 上传数据
      df = pd.DataFrame({'id': np.int32([1, 2, 3, 4, 3]), 'value':  np.double([7.8, 4.6, 5.1, 9.6, 0.1]), 'x': np.int32([5, 4, 3, 2, 1])})
      s.upload({'t1': df})
      # 读取数据,t1表的value列的均值
      print(s.run("t1.value.avg()"))
      

      DolphinDB使用案例12:Python API接口

  • 使用table函数上传(通过在DolphinDB中创建table表实现上传)

    table函数用于在Python中创建DolphinDB表对象。

    table函数的输入可以是字典DataFrameDolphinDB中的表名

    # 第一步:创建table表
    dt = s.table(data={'id':[1, 2, 2, 3],
                       'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
                       'price': [22, 3.5, 21, 26]}).executeAs("test") # 方式一
    
    dt = s.table(data={'id':[1, 2, 2, 3],
                       'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
                       'price': [22, 3.5, 21, 26]}, tableAliasName="testDict") # 方式二
    # 第二步:加载table表
    result = s.loadTable("test")
    

    实质是tabledata数据在s中创建了一个表,以这种方式实现了数据上传。

    DolphinDB使用案例12:Python API接口

  • 上传一个包含空值的数据表

    有时需要使用Python API来向DolphinDB服务器的分区表中追加数据。

    # 第一步:在DolphinDB服务器端创建数据表(这段代码在DolphinDB客户端执行)
    if(existsDatabase("dfs://testPython")){
    	dropDatabase("dfs://testPython")
    	}
    db = database("dfs://testPython", VALUE, 1..100)
    t1 = table(10000:0,`id`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING])
    insert into t1 values (0,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,"","")
    db.createPartitionedTable(t1, `t1, `id)
    

    DolphinDB使用案例12:Python API接口

    <<未完待续<<

  • 从DolphinDB数据库中加载数据

  • 使用loadTable函数

    loadTable函数从数据库中加载表。

    1. 参数tableName:表示分区表的名称;
    2. 参数dbPath:表示数据库的路径,如果不指定,则加载到内存中;
    3. 参数memoryMode:对于分区表,
      1. true且未指定partition参数,把表中的所有数据加载到内存的分区表中;
      2. ture且指定partition参数,则之家在指定的分区数据到内存的分区表中;
      3. false,只把元数据加载到内存。
    # 示例一:加载整个表的数据
    trade = s.loadTable(tableName="trade", dbPath= "D:/DolphinDB/Python/valuedb")
    # 示例二:加载指定分区的数据
    trade = s.loadTable(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", partitions= "AMZN") # 选择一个分区
    trade = s.loadTable(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", partitions= ["AMZN","NVDA"]) # 选择两个分区
    
  • 使用loadTableBySQL函数载入数据

    loadTableBySQL函数磁盘上的分区表中满足SQL语句过滤条件的数据加载到内存的分区表中。

    # 去除重名数据库
    if s.existsDatabase("D:/DolphinDB/Python/valuedb" or os.path.exists("D:/DolphinDB/Python/valuedb")):
        s.dropDatabase("D:/DolphinDB/Python/valuedb")
    # 新建数据库
    s.database(dbName= 'db', partitionType= keys.VALUE, partitions= ["AMZN", "NFLX", "NVDA"], dbPath= "D:/DolphinDB/Python/valuedb")
    # 将数据载入数据库
    t = s.loadTextEx("db", tableName= 'trade', partitionColumns= ['TICKER'], filePath= "D:/DolphinDB/Python/example.csv")
    # 查询符合SQL条件的数据,载入内存
    trade = s.loadTableBySQL(tableName= "trade", dbPath= "D:/DolphinDB/Python/valuedb", sql= "select * from trade where date>2010.01.01")
    # 查看trade表的行数
    dsc(trade.rows)
    

    DolphinDB使用案例12:Python API接口

  • 从DolphinDB下载数据到python时的数据转换

    DolphinDB Python API使用Python原生的各种形式的数据兑现过来存放DolphinDB服务端返回的数据,下面给出DolphinDB的数据对象到Python的数据对象的映射关系:

    DolphinDB Python DolphinDB生成数据 Python数据
    scalar Numbers, Strings, NumPy.datetime64 见6.3.2小节 见6.3.2小节
    vector NumPy.array 1…3 [1 2 3]
    pair Lists 1:5 [1, 5]
    matrix Lists 1…6$2:3 [array([[1, 3, 5],[2, 4, 6]], dtype=int32), None, None]
    set Sets set(3 5 4 6) {3, 4, 5, 6}
    dictionary Dictionaries dict([‘IBM’,‘MS’,‘ORCL’], 170.5 56.2 49.5) {‘MS’: 56.2, ‘IBM’: 170.5, ‘ORCL’: 49.5}
    table pandas.DataFame 第6.1小节 第6.1小节

    DolphinDB 表通过toDF()函数转成Python数据,对应转换关系为:

    DolphinDB类型 Python类型 DolphinDB数据 Python数据
    BOOL bool [true,00b] [True, nan]
    CHAR int64 [12c,00c] [12, nan]
    SHORT int64 [12,00h] [12, nan]
    INT int64 [12,00i] [12, nan]
    LONG int64 [12l,00l] [12, nan]
    DOUBLE float64 [3.5,00F] [3.5,nan]
    FLOAT float64 [3.5,00f] [3.5, nan]
    SYMBOL object symbol([“AAPL”,NULL]) [“AAPL”,""]
    STRING object [“AAPL”,string()] [“AAPL”, “”]
    DATE datetime64 [2012.6.12,date()] [2012-06-12, NaT]
    MONTH datetime64 [2012.06M, month()] [2012-06-01, NaT]
    TIME datetime64 [13:10:10.008,time()] [1970-01-01 13:10:10.008, NaT]
    MINUTE datetime64 [13:30,minute()] [1970-01-01 13:30:00, NaT]
    SECOND datetime64 [13:30:10,second()] [1970-01-01 13:30:10, NaT]
    DATETIME datetime64 [2012.06.13 13:30:10,datetime()] [2012-06-13 13:30:10,NaT]
    TIMESTAMP datetime64 [2012.06.13 13:30:10.008,timestamp()] [2012-06-13 13:30:10.008,NaT]
    NANOTIME datetime64 [13:30:10.008007006, nanotime()] [1970-01-01 13:30:10.008007006,NaT]
    NANOTIMESTAMP datetime64 [2012.06.13 13:30:10.008007006,nanotimestamp()] [2012-06-13 13:30:10.008007006,NaT]

    其中:

    1. DolphinDB CHAR类型会被转换成Python int64类型。对此结果,用户可以使用Pythonchr函数使之转换为字符。
    2. 由于Python pandas中所有有关时间的数据类型均为datetime64DolphinDB中的所有时间类型数据均会被转换为datetime64类型。MONTH类型,如2012.06M,会被转换为2012-06-01(即月份当月的第一天)。
    3. TIME, MINUTE, SECONDNANOTIME类型不包含日期信息,转换时会自动添加1970-01-01,例如13:30m会被转换为1970-01-01 13:30:00
    4. 缺失值处理:DolphinDB中的逻辑型、数值型和时序类型的NULL值默认情况下是NaNNaT,字符串的NULL值为空字符串。
  • 追加数据到DolphinDB数据表

  • DolphinDB数据表分类

    1. 内存表:数据近保存在内存中,存取速度最快,但是节点关闭后数据就不存在了;
    2. 本地磁盘表:数据保存在本地磁盘上,可以从磁盘加载到内存;
    3. 分布式表:数据分布在不同的节点,通过DolphinDB的分布式计算引擎,仍然可以像本地表一样做统一查询。
  • 追加数据到DolphinDB内存表

    1. 通过insert into语句保存数据
      # 第一步:生成内存表
      script = """t = table(1:0,`id`date`ticker`price, [INT,DATE,STRING,DOUBLE])
      share t as tglobal"""   # 内存表是会话隔离的,所以普通内存表只有当前会话可见。为了让多个客户端可以同时访问t,使用share在会话间共享内存表
      s.run(script) 
      # 第二步:插入单条数据
      script = "insert into tglobal values(%s, date(%s), %s, %s);tglobal"% (1, np.datetime64("2019-01-01").astype(np.int64), '`AAPL', 5.6) # 将numpy的时间类型强制转换成64位整型,同时在insert语句中显式的调用date函数在服务端将时间列的整型数据转换成对应的类型
      s.run(script)
      

      DolphinDB的内存表并不提供数据类型自动转换的功能,因此在向内存表追加数据时,需要在服务端显式地调用时间转换函数对事件类型的列进行转换,首先要确保插入的数据类型与内存表schema中的数据类型保持一致。

      # 使用insert into语句一次性插入多条数据
      rowNum = 5
      ids = np.arange(1, rowNum+1, 1, dtype=np.int32)
      dates = np.array(pd.date_range('4/1/2019', periods=rowNum), dtype='datetime64[D]')
      tickers = np.repeat("AA", rowNum)
      prices = np.arange(1, 0.6*(rowNum+1), 0.6, dtype=np.float64)
      s.upload({'ids':ids, "dates":dates, "tickers":tickers, "prices":prices})
      script = "insert into tglobal values(ids,dates,tickers,prices);"
      s.run(script)
      

      其中,date_range()函数的dtype参数为datetime64[D],生成了只含有日期的时间列,与DolphinDB中的date类型保持一致,因此可以直接通过insert插入,不需要转换。

      如果这里的时间数据类型式datetime64,则需要如下转换才能追加到内存表:

      script = "insert into tglobal value(ids, date(dates), tickers, prices);"
      s.run(script)
      
    2. 通过table Insert函数批量保存多条数据

      如果Python程序获取的数据可以组织成list形式,且保证数据类型正确的前提下,可以通过tableInsert函数来批量保存多条数据。

      此方式优点在于在一次访问服务器请求中将上传数据对象和追加数据这两个步骤一次性完成,相比于insert into减少了一次访问DolphinDB服务器的请求。

      # tableInsert批量追加多条数据
      args = [ids, dates, tickers, prices]
      s.run("tableInsert{tglobal}", args)
      s.run("tglobal")
      

      tableInsert函数除了可以追加多条数据之外,还可以直接追加一个表,其中,时间列仍然需要特殊说明。

      • 如果表中没有时间列

        直接通过部分应用的方式,将一个DataFrame直接上传到服务器并追加到内存表。

        # --如果表中没有时间列
        import pandas as pd
        
        # 生成内存表
        script = """t = table(1:0,`id`ticker`price, [INT,STRING,DOUBLE])
        share t as tdglobal"""
        s.run(script)
        
        # 生成要追加的DataFrame
        tb=pd.DataFrame({'id': [1, 2, 2, 3],
                         'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
                         'price': [22, 3.5, 21, 26]})
        s.run("tableInsert{tdglobal}",tb)
        
      • 如果表中有时间列

        Python pandas中所有有关时间的数据类型均为datetime64,DolphinDB表中时间类型共9种。

        因此在追加一个带有时间列的表时,需要:

        1. 先将Dataframe上传到服务器;
        2. 通过select语句将表中的每一列都选出来;
        3. 每一个时间列进行时间类型转换;
        4. 构成一个新表;
        5. 将这个信标追加到内存表种。
        # --表中有时间列
        import pandas as pd
        tb=pd.DataFrame({'id': [1, 2, 2, 3],
                    'date': np.array(['2019-02-04', '2019-02-05', '2019-02-09', '2019-02-13'], dtype='datetime64[D]'),
                    'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
                    'price': [22, 3.5, 21, 26]})
        s.upload({'tb':tb})
        s.run("tb1=table((exec id from tb) as id, (exec date(date) from tb) as date, (exec ticker from tb) as ticker, (exec price from tb) as price)")
        s.run("tableInsert(tglobal,tb1)")
        
    3. 通过append!函数把一张表追加到另一张表

      可以实现,但不推荐。

      因为append!函数会返回一个表的schema,增加通信量。

      • 若表中没有时间列
        import pandas as pd
        
        # 生成内存表
        script = """t = table(1:0,`id`ticker`price, [INT,STRING,DOUBLE])
        share t as tdglobal"""
        s.run(script)
        
        # 生成要追加的DataFrame
        tb=pd.DataFrame({'id': [1, 2, 2, 3],
                         'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
                         'price': [22, 3.5, 21, 26]})
        s.run("append!{tdglobal}",tb)
        

        DolphinDB使用案例12:Python API接口

      • 若表中有时间列
        # --有时间序列
        import pandas as pd
        tb=pd.DataFrame(createDemoDict())
        s.upload({'tb':tb})
        s.run("tb1=table((exec id from tb) as id, (exec date(date) from tb) as date, (exec ticker from tb) as ticker, (exec price from tb) as price)")
        s.run("append!(tglobal,tb1)")
        

        此时,并不返回信息

        DolphinDB使用案例12:Python API接口

  • 追加数据到本地磁盘表

    本地磁盘表通常用于静态数据集的计算分析,既可以用于数据的输入,也可以作为计算的输出。它不支持事务,也不持支并发读写

    # 生成磁盘表
    dbPath="'D:/DolphinDB/Python/valuedb'"
    tableName='dt'
    script = """t = table(100:0, `id`date`ticker`price, [INT,DATE,STRING,DOUBLE]); 
    db = database({db}); 
    saveTable(db, t, `{tb}); 
    share t as tDiskGlobal;""".format(db=dbPath,tb=tableName)
    s.run(script)
    

    其中:

    databae函数创建数据库;

    saveTable函数将内存表保存到磁盘中;

  • tableInsert函数

    tableInsert函数时向本地磁盘表追加数据最为常用的方式。本质上,tableInsert时将数据插入到内存表,需要使用saveTable函数将插入的数据保存到磁盘上。

    # 在上述代码的基础上,运行此代码
    # tableInsert将数据保存到本地磁盘表
    rowNum = 5
    ids = np.arange(1, rowNum+1, 1, dtype=np.int32)
    dates = np.array(pd.date_range('4/1/2019', periods=rowNum), dtype='datetime64[D]')
    tickers = np.repeat("AA", rowNum)
    prices = np.arange(1, 0.6*(rowNum+1), 0.6, dtype=np.float64)
    args = [ids, dates, tickers, prices]
    s.run("tableInsert{tDiskGlobal}", args)
    s.run("saveTable(db,tDiskGlobal,`{tb});".format(tb=tableName))
    

    与追加表到内存表类似,本地磁盘表也支持通过tableInsert函数和append!函数直接追加一个表,同样也需要区分有无时间列的情况,唯一的区别是,本地磁盘表在追加之后要执行saveTable函数来保存到磁盘上,具体操作过程不再赘述。

    或者可以理解为,insert into、tableInsert、append!追加一条、多条、一个表到即有表上,都是追加到内存表,如果需要到磁盘表,再使用saveTable函数来保存。

  • 追加数据到分布式表

    分布式表是DolphinDB推荐在生产环境下使用的数据存储方式,

    1. 它支持快照级别的事务隔离,保证数据一致性;
    2. 分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。

    只有启用enableDFS=1的集群环境才能使用分布式表。

    # 生成分布式表---此处需要连接数据节点(8921),不能是控制节点(8920)
    s = ddb.session()
    s.connect("localhost", 8921, 'admin', '123456')
    
    dbPath="'dfs://testPython'"
    tableName='t1'
    script = """
    dbPath={db}
    if(existsDatabase(dbPath))
    	dropDatabase(dbPath)
    db = database(dbPath, VALUE, 0..100)
    t1 = table(10000:0,`id`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING])
    insert into t1 values (0,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,'','')
    t = db.createPartitionedTable(t1, `{tb}, `id)
    t.append!(t1)""".format(db=dbPath,tb=tableName)
    s.run(script)
    

    DolphinDB提供loadTable方法来加载分布式表,通过tableInsert方式追加数据、append!追加表。

    与内存表和磁盘表不同的是,分布式表在追加表的时候提供时间类型自动转换的功能,因此无需显示地进行类型转换。

  • 操作数据库和表2

  • Python中session类常用的用于操作数据库和表的方法

    1. 数据库相关
      方法名 详情
      database 创建数据库
      dropDatabase(dbPath) 删除数据库
      dropPartition(dbPath, partitionPaths) 删除数据库的某个分区
      existsDatabase 判断是否存在数据库
    2. 表和分区相关
      方法名 详情
      dropTable(dbPath, tableName) 删除数据库中的表
      existsTable 判断是否存在表
      loadTable 加载本地磁盘表或者分布式表到内存
      table 创建表

      更多方法见session.py

  • Python中Table类常用方法

    方法名 详情
    append 向表中追加数据
    drop(colNameList) 删除表中的某列
    executeAs(tableName) 执行结果保存为指定表名的内存表
    execute() 执行脚本。与updatedelete一起使用
    toDF() 把DolphinDB表对象转换成pandas的DataFrame对象

    更多方法见table.py

  • Python客户端创建一个数据表

    1. 调用Session类提供的table方法;
    2. 调用Session类提供的upload方法;
    3. 调用Session类提供的run方法;

    以上3种方式都等价于在DolphinDB服务端调用table方法创建一个名为’tb’的内存数据表

  • 操作数据库

    1. 创建数据库database
    2. 删除数据库dropDatabase
    3. 删除DFS数据库的分区dropPartition
  • 操作表

    1. 加载数据库中的表,loadTableloadTableBySQL
    2. 数据表添加数据,insert intotableInsertappend!
    3. 更新表,update只能用于更新内存表,并且必须和execute一起使用;
    4. 删除表中的记录,delete必须与execute一起使用来删除表中的记录。
    5. 删除表中的列trade.drop(['ask', 'bid']);
    6. 删除表s.dreopTable(dbPath,tableName);
  • SQL查询

    1. select选择指定列
      # select用法一:用列名作输入
      trade  = s.loadTable(tableName= 'trade', dbPath= "D:/DolphinDB/Python/valuedb", memoryMode= True)
      result = trade.select(['ticker', 'date', 'bid', 'ask', 'prc', 'vol']).toDF()
      dsc(result)
      # showSQL函数
      print(trade.select(['ticker','date','bid','ask','prc','vol']).where("date=2012.09.06").where("vol<10000000").showSQL())
      

      DolphinDB使用案例12:Python API接口

      # select用法二:使用字符串作为输入
      >> trade  = s.loadTable(tableName= 'trade', dbPath= "D:/DolphinDB/Python/valuedb", memoryMode= True)
      >> print(trade.select("ticker,date,bid,ask,prc,vol").where("date=2012.09.06").where("vol<10000000").toDF())
      
        ticker       date        bid     ask     prc      vol
      0   AMZN 2012-09-06  251.42999  251.56  251.38  5657816
      1   NFLX 2012-09-06   56.65000   56.66   56.65  5368963
      
    2. top取表中前n条记录
      # top用于取表中的前n条记录
      >> trade  = s.loadTable(tableName= 'trade', dbPath= "D:/DolphinDB/Python/valuedb", memoryMode= True)
      >> print(trade.top(5).toDF())
      
        TICKER       date      VOL     PRC     BID     ASK
      0   AMZN 1997-05-15  6029815  23.500  23.500  23.625
      1   AMZN 1997-05-16  1232226  20.750  20.500  21.000
      2   AMZN 1997-05-19   512070  20.500  20.500  20.625
      3   AMZN 1997-05-20   456357  19.625  19.625  19.750
      4   AMZN 1997-05-21  1577414  17.125  17.125  17.250
      
    3. where过滤数据
      # 多个过滤条件
      >> trade  = s.loadTable(tableName= 'trade', dbPath= "D:/DolphinDB/Python/valuedb", memoryMode= True)
      >> t1 = trade.select(['date', 'bid', 'ask', 'prc', 'vol']).where('TICKER=`AMZN').where('bid!=NULL').where('vol>10000000').sort('vol desc').executeAs('t1')
      >> dsc(t1.toDF().head())
      
      变量的长度是: 5
              date    bid      ask     prc        vol
      0 2007-04-25  56.80  56.8100  56.810  104463043
      1 1999-09-29  80.75  80.8125  80.750   80380734
      2 2006-07-26  26.17  26.1800  26.260   76996899
      3 2007-04-26  62.77  62.8300  62.781   62451660
      4 2005-02-03  35.74  35.7300  35.750   60580703
      变量的类型是: <class 'pandas.core.frame.DataFrame'>
      >>>>>>>>>>>>>>>华丽丽的分隔线<<<<<<<<<<<<<<<
      
      # 多个条件的字符串
      trade  = s.loadTable(tableName= 'trade', dbPath= "D:/DolphinDB/Python/valuedb", memoryMode= True)
      t2 = trade.select("ticker, date, vol").where("bid!=NULL, ask!=NULL, vol>50000000").toDF()
      

      showSQL()函数一样使用

    4. groupby

      groupby后面需要使用聚合函数,如countsumaggagg2having

    5. contextby

      contextbygroupby相似,区别在于groupby为每个组返回一个标量,但是contextby为每个组返回一个向量。每组返回的向量长度与这一组的行数相同。

    6. merge

      merge用于内部连接、左连接、外部连接;

      merge_asof表示asof join;

      merge_window表示窗口连接。

      对于merge,如果连接列名称相同,使用on参数指定连接列,如果连接列名称不同,使用left_onright_on参数指定连接列。可选参数how表示表连接的类型。默认的连接类型时内部连接。分区表只能与分区表进行外部链接,内存表只能与内存表进行外部链接。

    7. executeAs 把结果保存为DolphinDB中的表对象
    8. ols计算最小二乘回归系数
  • Python Streaming API

    《DolphinDB使用案例16:流数据订阅》

  • 实例

    1. DolphinDB使用案例14: Python API实现动量交易策略
    2. DolphinDB使用案例15:Python API实现时间序列操作
  • References


  1. Github Tutorials >> Python_api.md待更新 ↩︎

  2. Github python3_api_experimental/README.md ↩︎