DolphinDB提供了多种灵活的数据导入方法,来帮助用户方便的把海量数据从多个数据源导入。具体有以下4种途径:node
本章中多处使用到DolphinDB的数据库和表的概念,因此这里首先作一个介绍。数据库
在DolphinDB里数据以结构化数据表的方式保存。数据表按存储介质能够分为:服务器
按是否分区能够分为:app
在传统的数据库系统,分区是针对数据表定义的,就是同一个数据库里的每一个数据表均可以有本身的分区定义;而DolphinDB的分区是针对数据库定义的,也就是说同一个数据库下的数据表只能使用同一种分区机制,这也意味着若是两张表要使用不一样的分区机制,那么它们是不能放在一个数据库下的。分布式
经过文件进行数据中转是比较通用化的一种数据迁移方式,方式简单易操做。DolphinDB提供了如下三个函数来载入文本文件:ide
如下为将candle_201801.csv导入DolphinDB来演示loadText和loadTextEx的用法。函数
2.1 loadText性能
loadText函数有三个参数,第一个参数filename是文件名,第二个参数delimiter用于指定不一样字段的分隔符,默认是",",第三个参数schema是用来指定导入后表的每一个字段的数据类型,schema参数是一个数据表,格式示例以下:spa
首先导入数据:插件
dataFilePath = "/home/data/candle_201801.csv" tmpTB = loadText(dataFilePath);
DolphinDB在导入数据的同时,随机提取一部分的行以肯定各列数据类型,因此对大多数文本文件无须手动指定各列的数据类型,很是方便。但有时系统自动识别的数据类型并不符合预期或需求,好比导入数据的volume列被识别为INT类型, 而须要的volume类型是LONG类型,这时就须要使用一个数据类型表做为schema参数。例如可以使用以下脚本构建数据类型表:
nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime typeCol = `SYMBOL`SYMBOL`INT`DATE`DATE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`LONG schemaTb = table(nameCol as name,typeCol as type);
当表字段很是多的时候,写这样一个脚本费时费力,为了简化操做,DolphinDB提供了extractTextSchema 函数,可从文本文件中提取表的结构生成数据类型表。只需修改少数指定字段的数据类型,就可获得理想的数据类型表。
整合上述方法,可以使用以下脚本以导入数据:
dataFilePath = "/home/data/candle_201801.csv" schemaTb=extractTextSchema(dataFilePath) update schemaTb set type=`LONG where name=`volume tt=loadText(dataFilePath,,schemaTb);
2.2 ploadText
ploadText函数的特色能够快速载入大文件。它在设计中充分利用了多核CPU来并行载入文件,并行程度取决于服务器自己CPU核数量和节点的localExecutors配置。
首先经过脚本生成一个4G左右的CSV文件:
filePath = "/home/data/testFile.csv" appendRows = 100000000 dateRange = 2010.01.01..2018.12.30 ints = rand(100, appendRows) symbols = take(string('A'..'Z'), appendRows) dates = take(dateRange, appendRows) floats = rand(float(100), appendRows) times = 00:00:00.000 + rand(86400000, appendRows) t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time) t.saveText(filePath)
分别经过loadText和ploadText来载入文件。本例所用节点是4核8超线程的CPU。
timer loadText(filePath); Time elapsed: 39728.393 ms timer ploadText(filePath); Time elapsed: 10685.838 ms
结果显示在此配置下,ploadText的性能是loadText的4倍左右。
2.3 loadTextEx
loadText函数老是把全部数据导入内存。当数据文件体积很是庞大时,服务器的内存很容易成为制约因素。DolphinDB提供的;loadTextEx函数能够较好的解决这个问题。它将一个大的文本文件分割成不少个小块,逐步加载到分布式数据表中。
首先建立分布式数据库:
db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31)
而后将文本文件导入数据库中"cycle"表:
dataFilePath = "/home/data/candle_201801.csv" loadTextEx(db, "cycle", "tradingDay", dataFilePath)
当须要使用数据时,经过loadTable函数将分区元数据先载入内存。
tb = database("dfs://dataImportCSVDB").loadTable("cycle")
在实际执行查询的时候,会按需加载所需数据到内存。
对于二进制格式的文件,DolphinDB提供了2个函数用于导入:readRecord!函数和loadRecord函数。两者的区别是,前者不支持导入字符串类型的数据,后者支持。下面经过2个例子分别介绍这两个函数的用法。
readRecord!函数可以导入不含有字符串类型字段的二进制文件,下面介绍如何使用readRecord!函数导入一个二进制文件:binSample.bin。
首先,建立一个内存表tb,用于存放导入的数据,须要为每一列指定字段名称和数据类型。
tb=table(1000:0, `id`date`time`last`volume`value`ask1`ask_size1`bid1`bid_size1, [INT,INT,INT,FLOAT,INT,FLOAT,FLOAT,INT,FLOAT,INT])
调用file函数打开文件,并经过readRecord!函数导入二进制文件,数据会被加载到tb表中。
dataFilePath="/home/data/binSample.bin" f=file(dataFilePath) f.readRecord!(tb);
查看tb表的数据,数据已经正确导入:
select top 5 * from tb; id date time last volume value ask1 ask_size1 bid1 bid_size1 -- -------- -------- ---- ------ ----- ----- --------- ----- --------- 1 20190902 91804000 0 0 0 11.45 200 11.45 200 2 20190902 92007000 0 0 0 11.45 200 11.45 200 3 20190902 92046000 0 0 0 11.45 1200 11.45 1200 4 20190902 92346000 0 0 0 11.45 1200 11.45 1200 5 20190902 92349000 0 0 0 11.45 5100 11.45 5100
导入之后的数据中,date列和time列的数据以数值形式存储,为了更直观地显示数据,可使用temporalParse函数进行日期和时间类型数据的格式转换。再使用replaceColumn!函数替换表中原有的列。具体以下所示。
tb.replaceColumn!(`date, tb.date.string().temporalParse("yyyyMMdd")) tb.replaceColumn!(`time, tb.time.format("000000000").temporalParse("HHmmssSSS")) select top 5 * from tb; id date time last volume value ask1 ask_size1 bid1 bid_size1 -- ---------- ------------ ---- ------ ----- ----- --------- ----- --------- 1 2019.09.02 09:18:04.000 0 0 0 11.45 200 11.45 200 2 2019.09.02 09:20:07.000 0 0 0 11.45 200 11.45 200 3 2019.09.02 09:20:46.000 0 0 0 11.45 1200 11.45 1200 4 2019.09.02 09:23:46.000 0 0 0 11.45 1200 11.45 1200 5 2019.09.02 09:23:49.000 0 0 0 11.45 5100 11.45 5100
loadRecord函数可以处理字符串类型的数据(包括STRING和SYMBOL类型),可是要求字符串在磁盘上的长度必须固定。若是字符串的长度小于固定值,则用ASCII值0填充,加载的时候会把末尾0去掉。下面介绍使用loadRecord函数导入一个带有字符串类型字段的二进制文件:binStringSample.bin。
首先,指定要导入文件的表结构,包括字段名称和数据类型。与readRecord!函数不一样的是,loadRecord函数是经过一个元组来指定schema,而不是直接定义一个内存表。关于表结构的指定,有如下3点要求:
针对本例中的数据文件指定表结构,具体以下所示:
schema = [("code", SYMBOL, 32),("date", INT),("time", INT),("last", FLOAT),("volume", INT),("value", FLOAT),("ask1", FLOAT),("ask2", FLOAT),("ask3", FLOAT),("ask4", FLOAT),("ask5", FLOAT),("ask6", FLOAT),("ask7", FLOAT),("ask8", FLOAT),("ask9", FLOAT),("ask10", FLOAT),("ask_size1", INT),("ask_size2", INT),("ask_size3", INT),("ask_size4", INT),("ask_size5", INT),("ask_size6", INT),("ask_size7", INT),("ask_size8", INT),("ask_size9", INT),("ask_size10", INT),("bid1", FLOAT),("bid2", FLOAT),("bid3", FLOAT),("bid4", FLOAT),("bid5", FLOAT),("bid6", FLOAT),("bid7", FLOAT),("bid8", FLOAT),("bid9", FLOAT),("bid10", FLOAT),("bid_size1", INT),("bid_size2", INT),("bid_size3", INT),("bid_size4", INT),("bid_size5", INT),("bid_size6", INT),("bid_size7", INT),("bid_size8", INT),("bid_size9", INT),("bid_size10", INT)]
使用loadRecord函数导入二进制文件,因为表的列数较多,经过select语句选出几列有表明性的数据进行后续介绍。
dataFilePath="/home/data/binStringSample.bin" tmp=loadRecord(dataFilePath, schema) tb=select code,date,time,last,volume,value,ask1,ask_size1,bid1,bid_size1 from tmp;
查看表内数据的前5行。
select top 5 * from tb; code date time last volume value ask1 ask_size1 bid1 bid_size1 --------- -------- -------- ---- ------ ----- ----- --------- ----- --------- 601177.SH 20190902 91804000 0 0 0 11.45 200 11.45 200 601177.SH 20190902 92007000 0 0 0 11.45 200 11.45 200 601177.SH 20190902 92046000 0 0 0 11.45 1200 11.45 1200 601177.SH 20190902 92346000 0 0 0 11.45 1200 11.45 1200 601177.SH 20190902 92349000 0 0 0 11.45 5100 11.45 5100
用一样的方法处理日期和时间列的数据:
tb.replaceColumn!(`date, tb.date.string().temporalParse("yyyyMMdd")) tb.replaceColumn!(`time, tb.time.format("000000000").temporalParse("HHmmssSSS")) select top 5 * from tb; code date time last volume value ask1 ask_size1 bid1 bid_size1 --------- ---------- ------------ ---- ------ ----- ----- --------- ----- --------- 601177.SH 2019.09.02 09:18:04.000 0 0 0 11.45 200 11.45 200 601177.SH 2019.09.02 09:20:07.000 0 0 0 11.45 200 11.45 200 601177.SH 2019.09.02 09:20:46.000 0 0 0 11.45 1200 11.45 1200 601177.SH 2019.09.02 09:23:46.000 0 0 0 11.45 1200 11.45 1200 601177.SH 2019.09.02 09:23:49.000 0 0 0 11.45 5100 11.45 5100
除了readRecord!和loadRecord函数以外,DolphinDB还提供了一些与二进制文件的处理相关的函数,例如writeRecord函数,用于将DolphinDB对象保存为二进制文件。具体请参考用户手册。
HDF5是一种高效的二进制数据文件格式,在数据分析领域普遍使用。DolphinDB支持导入HDF5格式数据文件。
DolphinDB经过HDF5插件来访问HDF5文件,插件提供了如下方法:
DolphinDB 1.00.0版本以后,安装目录/server/plugins/hdf5已经包含HDF5插件,使用如下脚本加载插件:
loadPlugin("plugins/hdf5/PluginHdf5.txt")
若用户使用的是老版本,默认不包含此插件,可先从HDF5插件对应版本分支bin目录下载,再将插件部署到节点的plugins目录下。
调用插件方法时须要在方法前面提供namespace,好比调用loadHDF5可使用hdf5::loadHDF5。另外一种写法是:、
use hdf5 loadHDF5(filePath,tableName)
HDF5文件的导入与CSV文件相似。例如,若要导入包含一个Dataset candle_201801的文件candle_201801.h5,可以使用如下脚本,其中datasetName可经过ls或lsTable得到:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" tmpTB = hdf5::loadHDF5(dataFilePath,datasetName)
若是须要指定数据类型导入可使用hdf5::extractHDF5Schema,脚本以下:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" schema=hdf5::extractHDF5Schema(dataFilePath,datasetName) update schema set type=`LONG where name=`volume tt=hdf5::loadHDF5(dataFilePath,datasetName,schema)
若是HDF5文件超过服务器内存,可使用hdf5::loadHDF5Ex载入数据。
首先建立用于保存数据的分布式表:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" dfsPath = "dfs://dataImportHDF5DB" db=database(dfsPath,VALUE,2018.01.01..2018.01.31)
而后导入HDF5文件:
hdf5::loadHDF5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)
DolphinDB支持ODBC接口链接第三方数据库,从其中直接将数据表读取成DolphinDB的内存数据表。
DolphinDB官方提供ODBC插件用于链接第三方数据源,使用该插件能够方便的从ODBC支持的数据库迁移数据至DolphinDB中。
ODBC插件提供了如下四个方法用于操做第三方数据源数据:
在使用ODBC插件以前,须要安装ODBC驱动程序,请参考ODBC插件使用教程。
下面的例子使用ODBC插件链接如下SQL Server:
第一步,下载插件解压并拷贝 plugins\odbc 目录下全部文件到DolphinDB server的 plugins/odbc 目录下(有些版本的DolphinDB安装目录/server/plugins/odbc已经包含ODBC插件,可略过此步),经过下面的脚本完成插件初始化:
loadPlugin("plugins/odbc/odbc.cfg") conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa;Pwd=123456;")
第二步,建立分布式数据库。使用SQL Server中的数据表结构做为DolphinDB数据表的模板。
tb = odbc::query(conn,"select top 1 * from candle_201801") db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31) db.createPartitionedTable(tb, "cycle", "tradingDay")
第三步,从SQL Server中导入数据并保存为DolphinDB分区表:
tb = database("dfs://dataImportODBC").loadTable("cycle") data = odbc::query(conn,"select * from candle_201801") tb.append!(data);
经过ODBC导入数据方便快捷。经过DolphinDB的定时做业机制,它还能够做为时序数据定时同步的数据通道。
下面以股票市场日K线图数据文件导入做为示例。每一个股票数据存为一个CSV文件,共约100G,时间范围为2008年-2017年,按年度分目录保存。2008年度路径示例以下:
2008 ---- 000001.csv ---- 000002.csv ---- 000003.csv ---- 000004.csv ---- ...
每一个文件的结构都是一致的,如图所示:
6.1 分区规划
要导入数据以前,首先要作好数据的分区规划,即肯定分区字段以及分区粒度。
肯定分区字段要考虑平常的查询语句执行频率。以where, group by或context by中经常使用字段做为分区字段,能够极大的提高数据检索和分析的效率。使用股票数据的查询常常与交易日期和股票代码有关,因此咱们建议采用 tradingDay和symbol这两列进行组合(COMPO)分区。
分区大小应尽可能均匀,同时分区粒度不宜过大或太小。咱们建议一个分区未压缩前的原始数据大小控制在100M~1G之间。有关为什么分区大小应均匀,以及分区最佳粒度的考虑因素,请参考DolphinDB分区数据库教程第四节。
综合考虑,咱们能够在复合(COMPO)分区中,根据交易日期进行范围分区(每一年一个范围),并按照股票代码进行范围分区(共100个代码范围),共产生 10 * 100 = 1000 个分区,最终每一个分区的大小约100M左右。
首先建立交易日期的分区向量。若要为后续进入的数据预先制做分区,可把时间范围设置为2008-2030年。
yearRange = date(2008.01M + 12*0..22);
经过如下脚本获得symbol字段的分区向量。因为每只股票的数据量一致,咱们遍历全部的年度目录,整理出股票代码清单,并经过cutPoint函数分红100个股票代码区间。考虑到将来新增的股票代码可能会大于现有最大股票代码,咱们增长了一个虚拟的代码999999,做为股票代码的上限值。
symbols = array(SYMBOL, 0, 100) yearDirs = files(rootDir)[`filename] for(yearDir in yearDirs){ path = rootDir + "/" + yearDir symbols.append!(files(path)[`filename].upper().strReplace(".CSV","")) } symbols = symbols.distinct().sort!().append!("999999"); symRanges = symbols.cutPoints(100)
经过如下脚本建立复合(COMPO)分区数据库,以及数据库内的分区表"stockData":
columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime types = [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG] dbDate=database("", RANGE, yearRange) dbID=database("", RANGE, symRanges) db = database(dbPath, COMPO, [dbDate, dbID]) pt=db.createPartitionedTable(table(1000000:0,columns,types), `stockData, `tradingDay`symbol);
6.2 导入数据
数据导入的具体过程是经过目录树,将全部的CSV文件读取并写入到分布式数据库表dfs://SAMPLE_TRDDB中。这其中会有一些细节问题。例如,CSV文件中保存的数据格式与DolphinDB内部的数据格式存在差别,好比time字段,原始数据文件里是以整数例如“9390100000”表示精确到毫秒的时间,若是直接读入会被识别成整数类型,而不是时间类型,因此这里须要用到数据转换函数datetimeParse结合格式化函数format在数据导入时进行转换。可采用如下脚本:
datetimeParse(format(time,"000000000"),"HHmmssSSS")
若是单线程导入100GB的数据会耗时好久。为了充分利用集群的资源,咱们能够按照年度把数据导入拆分红多个子任务,发送到各节点的任务队列并行执行,提升导入的效率。这个过程可分为如下两步实现。
首先定义一个函数以导入指定年度目录下的全部文件:
def loadCsvFromYearPath(path, dbPath, tableName){ symbols = files(path)[`filename] for(sym in symbols){ filePath = path + "/" + sym t=loadText(filePath) database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t ) } }
而后经过rpc函数结合submitJob函数把该函数提交到各节点去执行:
nodesAlias="NODE" + string(1..4) years= files(rootDir)[`filename] index = 0; for(year in years){ yearPath = rootDir + "/" + year des = "loadCsv_" + year rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,`stockData) index=index+1 }
数据导入过程当中,可使用pnodeRun(getRecentJobs)来观察后台任务的完成状况。
须要注意的是,分区是 DolphinDB database 存储数据的最小单位。DolphinDB对分区的写入操做是独占式的,当任务并行进行的时候,请避免多任务同时向一个分区写入数据。本例中每一年的数据的写入由一个单独任务执行,各任务操做的数据范围没有重合,因此不可能发生多任务同时写入同一分区的状况。
本案例的详细脚本在附录提供下载连接。
7. 附录