干货丨时序数据库DolphinDB数据导入教程

企业在使用大数据分析平台时,首先须要把海量数据从多个数据源迁移到大数据平台中。node


在导入数据前,咱们须要理解 DolphinDB database 的基本概念和特色。git

DolphinDB数据表按存储介质分为3种类型:github

  • 内存表:数据只保存在本节点内存,存取速度最快,可是节点关闭后,数据将会丢失。
  • 本地磁盘表:数据保存在本地磁盘上,即便节点重启,也能够方便地经过脚本把数据加载到内存中。
  • 分布式表:数据在物理上分布在不一样的节点,经过DolphinDB的分布式计算引擎,逻辑上仍然能够像本地表同样作统一查询。

DolphinDB数据表按是否分区分为2种类型:数据库

  • 普通表
  • 分区表

在传统的数据库中,分区是针对数据表的,即同一个数据库中的每一个数据表能够有不一样的分区方案;而DolphinDB的分区是针对数据库的,即一个数据库只能使用一种分区方案。若是两个表的分区方案不一样,它们不能放在同一个数据库中。服务器


DolphinDB提供了3种灵活的数据导入方法:app

  • 经过CSV文本文件导入
  • 经过HDF5文件导入
  • 经过ODBC导入


1.经过CSV文本文件导入

经过CSV文件进行数据中转是比较通用的数据迁移方式。DolphinDB提供了loadTextploadTextloadTextEx三个函数来导入CSV文件。下面咱们经过一个示例CSV文件candle_201801.csv来讲明这3个函数的用法。分布式

1.1 loadTextide

语法:loadText(filename, [delimiter=','], [schema])函数

参数:性能

filename是文件名。

delimiterschema都是可选参数。

delimiter用于指定不一样字段的分隔符,默认是“,”。

schema用于数据导入后每一个字段的数据类型,它是一个table类型。DolphinDB提供了字段类型自动识别功能,可是某些状况下系统自动识别的数据类型不符合需求,好比咱们在导入示例CSVcandle_201801.csv时,volume字段会被识别成INT类型,实际上咱们须要LONG类型,这时就须要使用schema参数。

建立schema table的脚本:

nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
typeCol = [SYMBOL,SYMBOL,INT,DATE,DATE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,LONG]
schemaTb = table(nameCol as name,typeCol as type)

当表的字段很是多时,建立schema table的脚本会十分冗长。为了不这个问题,DolphinDB提供了extractTextSchema函数,它能够从文本文件中提取表的结构,咱们只需修改须要指定的字段类型便可。

dataFilePath = "/home/data/candle_201801.csv"
schemaTb=extractTextSchema(dataFilePath)
update schemaTb set type=`LONG where name=`volume        
tt=loadText(dataFilePath,,schemaTb)


1.2 ploadText

ploadText把数据文件做为分区表并行加载到内存中,语法和loadText彻底相同,可是ploadText的速度更快。ploadText主要用于快速载入大文件,它在设计上充分利用了多个core来并行载入文件,并行程度取决于服务器自己core数量和节点的localExecutors配置。

下面咱们对比loadText和ploadText的性能。

首先,经过脚本生成一个4G左右的CSV文件:

filePath = "/home/data/testFile.csv"
appendRows = 100000000
dateRange = 2010.01.01..2018.12.30
ints = rand(100, appendRows)
symbols = take(string('A'..'Z'), appendRows)
dates = take(dateRange, appendRows)
floats = rand(float(100), appendRows)
times = 00:00:00.000 + rand(86400000, appendRows)
t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time)
t.saveText(filePath)

分别使用loadText和ploadText来导入文件,该节点是4核8线程的CPU。

timer loadText(filePath);
//Time elapsed: 39728.393 ms
timer ploadText(filePath);
//Time elapsed: 10685.838 ms

结果显示,ploadText的性能差很少是loadText的4倍。


1.3 loadTextEx

语法:loadTextEx(dbHandle, tableName, [partitionColumns], fileName, [delimiter=','], [schema])

参数:

dbHandle是数据库句柄。

tableName是保存数据的分布式表的表名。

partitionColumnsdelimiterschema是可选参数。

当分区方案不是顺序分区时,须要指定partitionColumns,表示分区列。

fileName表示导入文件的名称。

delimiter用于指定不一样字段的分隔符,默认是“,”。

schema用于数据导入后每一个字段的数据类型,它是一个table类型。

loadText函数老是把数据导入到内存,当数据文件很是庞大时,工做机的内存很容易成为瓶颈。loadTextEx能够很好地解决这个问题,它经过边导入边保存的方式,把静态的CSV文件以较为平缓的数据流的方式“另存为”DolphinDB的分布式表,而不是采用所有导入内存再存为分区表的方式,大大下降了内存的使用需求。

首先建立用于保存数据的分布式表:

dataFilePath = "/home/data/candle_201801.csv"
tb = loadText(dataFilePath)
db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31)  
db.createPartitionedTable(tb, "cycle", "tradingDay")

而后将文件导入分布式表:

loadTextEx(db, "cycle", "tradingDay", dataFilePath)

当须要使用数据作分析的时候,经过loadTable函数将分区元数据先载入内存,在实际执行查询的时候,DolphinDB会按需加载数据到内存。

tb = database("dfs://dataImportCSVDB").loadTable("cycle")


2. 经过HDF5文件导入

HDF5是一种比CSV更高效的二进制数据文件格式,在数据分析领域普遍使用。DolphinDB也支持经过HDF5格式文件导入数据。

DolphinDB经过HDF5插件来访问HDF5文件,插件提供了如下方法:

  • hdf5::ls : 列出h5文件中全部 Group 和 Dataset 对象。
  • hdf5::lsTable :列出h5文件中全部 Dataset 对象。
  • hdf5::hdf5DS :返回h5文件中 Dataset 的元数据。
  • hdf5::loadHdf5 :将h5文件导入内存表。
  • hdf5::loadHdf5Ex :将h5文件导入分区表。
  • hdf5::extractHdf5Schema :从h5文件中提取表结构。

调用插件方法时须要在方法前面提供namespace,好比调用loadHdf5时hdf5::loadHdf5,若是不想每次调用都使用namespace,可使用use关键字:

use hdf5
loadHdf5(filePath,tableName)

要使用DolphinDB的插件,首先须要下载HDF5插件,再将插件部署到节点的plugins目录下,在使用插件以前须要先加载,使用下面的脚本:

loadPlugin("plugins/hdf5/PluginHdf5.txt")

HDF5文件的导入与CSV文件大同小异,好比咱们要将示例HDF5文件candle_201801.h5导入,它包含一个Dataset:candle_201801,那么最简单的导入方式以下:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
tmpTB = hdf5::loadHdf5(dataFilePath,datasetName)

若是须要指定数据类型导入可使用hdf5::extractHdf5Schema,脚本以下:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
schema=hdf5::extractHdf5Schema(dataFilePath,datasetName)
update schema set type=`LONG where name=`volume        
tt=hdf5::loadHdf5(dataFilePath,datasetName,schema)

若是HDF5文件很是庞大,工做机内存没法支持全量载入,可使用hdf5::loadHdf5Ex方式来载入数据。

首先建立用于保存数据的分布式表:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
dfsPath = "dfs://dataImportHDF5DB"
tb = hdf5::loadHdf5(dataFilePath,datasetName)
db=database(dfsPath,VALUE,2018.01.01..2018.01.31)  
db.createPartitionedTable(tb, "cycle", "tradingDay")

而后将HDF5文件经过hdf5::loadHdf5Ex函数导入:

hdf5::loadHdf5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)


3. 经过ODBC接口导入

DolphinDB支持ODBC接口链接第三方数据库,从数据库中直接将表读取成DolphinDB的内存数据表。使用DolphinDB提供的ODBC插件能够方便地从ODBC支持的数据库中迁移数据至DolphinDB中。

ODBC插件提供了如下四个方法用于操做第三方数据源数据:

  • odbc::connect : 开启链接。
  • odbc::close : 关闭链接。
  • odbc::query : 根据给定的SQL语句查询数据并返回到DolphinDB的内存表。
  • odbc::execute : 在第三方数据库内执行给定的SQL语句,不返回数据。

在使用ODBC插件前,须要先安装ODBC驱动,请参考ODBC插件使用教程

下面以链接 SQL Server 做为实例,现有数据库的具体配置为:

server:172.18.0.15

默认端口:1433

链接用户名:sa

密码:123456

数据库名称: SZ_TAQ

数据库表选2016年1月1日的数据,表名candle_201801,字段与CSV文件相同。

要使用ODBC插件链接SQL Server数据库,首先第一步是下载插件解压并拷贝plugins\odbc目录下全部文件到DolphinDB Server的plugins/odbc目录下,经过下面的脚本完成插件初始化:

//载入插件
loadPlugin("plugins/odbc/odbc.cfg")
//链接 SQL Server
conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa;
Pwd=123456;")

在导入数据以前,先建立分布式磁盘数据库用于保存数据:

//从SQL Server中取到表结构做为DolphinDB导入表的模板
tb = odbc::query(conn,"select top 1 * from candle_201801")
db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")

从SQL Server中导入数据并保存成DolphinDB分区表:

data = odbc::query(conn,"select * from candle_201801")
tb = database("dfs://dataImportODBC").loadTable("cycle")
tb.append!(data);

经过ODBC导入数据避免了文件导出导入的过程,并且经过DolphinDB的定时做业机制,它还能够做为时序数据定时同步的数据通道。


4. 金融数据导入案例

下面以证券市场日K线图数据文件导入做为示例,数据以CSV文件格式保存在磁盘上,共有10年的数据,按年度分目录保存,一共大约100G的数据,路径示例以下:

2008

---- 000001.csv

---- 000002.csv

---- 000003.csv

---- 000004.csv

---- ...

2009

...

2018

每一个文件的结构都是一致的,如图所示:

edcac7665c7b5c30c4316c15c573e93e.png


4.1 分区规划

要导入数据以前,首先要作好数据的分区规划,这涉及到两个方面的考量:

  • 肯定分区字段。
  • 肯定分区的粒度。

首先根据平常的查询语句执行频率,咱们采用trading和symbol两个字段进行组合范围(RANGE)分区,经过对经常使用检索字段分区,能够极大的提高数据检索和分析的效率。

接下来要作的是分别定义两个分区的粒度。

现有数据的时间跨度是从2008-2018年,因此这里按照年度对数据进行时间上的划分,在规划时间分区时要考虑为后续进入的数据留出足够的空间,因此这里把时间范围设置为2008-2030年。

yearRange =date(2008.01M + 12*0..22)

这里股票代码有几千个,若是对股票代码按值(VALUE)分区,那么每一个分区只是几兆大小,而分区数量则不少。分布式系统在执行查询时,会将查询语句分红多个子任务分发到不一样的分区执行,因此按值分区方式会致使任务数量很是多,而任务执行时间极短,致使系统在管理任务上花费的时间反而大于任务自己的执行时间,这样的分区方式明显是不合理的。这里咱们按照范围将全部股票代码均分红100个区间,每一个区间做为一个分区,最终分区的大小约100M左右。 考虑到后期有新的股票数据进来,因此增长了一个虚拟的代码999999,跟最后一个股票代码组成一个分区,用来保存后续新增股票的数据。

经过下面的脚本获得 symbol 字段的分区范围:

//遍历全部的年度目录,去重整理出股票代码清单,并经过cutPoint分红100个区间
symbols = array(SYMBOL, 0, 100)
yearDirs = files(rootDir)[`filename]
for(yearDir in yearDirs){
	path = rootDir + "/" + yearDir
	symbols.append!(files(path)[`filename].upper().strReplace(".CSV",""))
}
//去重并增长扩容空间:
symbols = symbols.distinct().sort!().append!("999999");
//均分红100份
symRanges = symbols.cutPoints(100)
经过下述脚本定义两个维度组合(COMPO)分区,建立Database和分区表:
columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
types =  [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]

dbDate=database("", RANGE, yearRange)
dbID=database("", RANGE, symRanges)
db = database(dbPath, COMPO, [dbDate, dbID])

pt=db.createPartitionedTable(table(1000000:0,columns,types), tableName, `tradingDay`symbol)

须要注意的是,分区是DolphinDB存储数据的最小单位,DolphinDB对分区的写入操做是独占式的,当任务并行进行的时候,须要避免多任务同时向一个分区写入数据。本案例中每一年的数据交给一个单独任务去作,各任务操做的数据边界没有重合,因此不可能发生多任务写入同一分区的状况。


4.2 导入数据

数据导入脚本的主要思路很简单,就是经过循环目录树,将全部的CSV文件逐个读取并写入到分布式数据库表dfs://SAMPLE_TRDDB中,可是具体导入过程当中仍是会有不少细节问题。

首先碰到的问题是,CSV文件中保存的数据格式与DolphinDB内部的数据格式存在差别,好比time字段,文件里是以“9390100000”表示精确到毫秒的时间,若是直接读入会被识别成数值类型,而不是time类型,因此这里须要用到数据转换函数datetimeParse结合格式化函数format在数据导入时进行转换。 关键脚本以下:

datetimeParse(format(time,"000000000"),"HHmmssSSS")

虽然经过循环导入实现起来很是简单,可是实际上100G的数据是由极多的5M左右的细碎文件组成,若是单线程操做会等待好久,为了充分利用集群的资源,因此咱们按照年度把数据导入拆分红多个子任务,轮流发送到各节点的任务队列并行执行,提升导入的效率。这个过程分下面两步实现:

(1)定义一个自定义函数,函数的主要功能是导入指定年度目录下的全部文件:

//循环处理年度目录下的全部数据文件
def loadCsvFromYearPath(path, dbPath, tableName){
	symbols = files(path)[`filename]
	for(sym in symbols){
		filePath = path + "/" + sym
		t=loadText(filePath)
		database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t )			
	}
}

(2)经过 rpc 函数结合 submitJob 函数把上面定义的函数提交到各节点去执行:

nodesAlias="NODE" + string(1..4)
years= files(rootDir)[`filename]

index = 0;
for(year in years){	
	yearPath = rootDir + "/" + year
	des = "loadCsv_" + year
	rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,tableName)
	index=index+1
}

数据导入过程当中,能够经过pnodeRun(getRecentJobs)来观察后台任务的完成状况。

案例完整脚本

相关文章
相关标签/搜索