开发大数据应用,不只须要能支撑海量数据的分布式数据库,能高效利用多核多节点的分布式计算框架,更须要一门能与分布式数据库和分布式计算有机融合、高性能易扩展、表达能力强、知足快速开发和建模须要的编程语言。DolphinDB从流行的Python和SQL语言汲取了灵感,设计了大数据处理脚本语言。java
提到数据库语言,咱们很容易想到标准的SQL语言。不一样于标准的SQL,DolphinDB编程语言功能齐全,表达能力很是强大,完美支持命令式编程、向量化编程、函数话编程、SQL编程、远程过程调用编程(RPC)和元编程等多种编程范式。DolphinDB编程语言的语法和表达习惯与Python和SQL很是类似,只要对Python和SQL有必定的了解,就能轻松掌握。相对而言,掌握内存时序数据库kdb+的q语言难度要大得多。python
DolphinDB的编程语言可以知足数据科学家快速开发和建模的需求。DolphinDB语言简洁灵活,表达能力强,大大提升了数据科学家的开发效率。DolphinDB支持向量化计算和分布式计算,具备极快的运行速度。下面将详细介绍DolphinDB编程语言的独特之处。ios
1.命令式编程git
与主流的脚本语言Python、JS等,还有强类型语言C、C++、Java等同样,DolphinDB也支持命令式编程。命令式编程是指经过执行一条一条的语句,实现最终目标。DolphinDB的命令式编程主要是用做上层模块的处理和调度。在大数据分析中,因为须要处理的数据量很是庞大,若是咱们采用命令式编程逐行处理数据,效率会十分低下,性能也会有所降低。所以,咱们推荐在DolphinDB中使用其余编程方式来批量处理数据。github
//DolphinDB支持对单变量和多变量进行赋值 x = 1 2 3 y = 4 5 y += 2 x, y = y, x //swap the value of x and y x, y =1 2 3, 4 5 // 1到100累加求和 s = 0 for(x in 1:101) s += x print s //数组中的元素求和 s = 0; for(x in 1 3 5 9 15) s += x print s //打印矩阵每一列的均值 m = matrix(1 2 3, 4 5 6, 7 8 9) for(c in m) print c.avg() //计算product表中每个产品的销售额 t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty) for(row in t) print row.productId + ": " + row.price * row.qty
2.向量化编程算法
跟matlab、R等编程语言同样,DolphinDB也支持向量化编程。前面提到的kdb+数据库的q语言也是向量处理语言,它在复杂的计算上表现出很好的性能,而且效率很高。DolphinDB的编程语言对不少算法都进行了优化,好比对时间序列数据计算滑动窗口指标,大大提升了向量函数的效率。sql
//两个长度为1000万的向量相加,采用向量化编程比命令式编程的for语句更加简洁,耗耗时更短。 n = 10000000 a = rand(1.0, n) b = rand(1.0, n) //采用for语句编程,须要12秒 c = array(DOUBLE, n) for(i in 0 : n) c[i] = a[i] + b[i] Time elapsed: 12341.043 ms //采用向量化编程,仅需36毫秒 c = a + b Time elapsed: 36.901 ms
向量化编程一般是把整个向量加载到连续内存中。有时候由于内存碎片,没有找到连续内存,向量就不可用了。DolphinDB针对这个问题,特地提供了big array数据类型。big array能够把物理上不连续的内存块组成逻辑上连续的向量,即便是很是大的向量,也能在DolphinDB中使用,提升了系统的可用性。数据库
3.函数化编程编程
DolphinDB支持函数化编程的大部分功能,包括纯函数、自定义函数、λ函数、高阶函数、部分应用和闭包。DolphinDB内置了400多个函数,涵盖了各类数据类型、数据结构和系统调用。api
DolphinDB的纯函数特性减小了函数的反作用。在自定义函数时,DolphinDB不能使用函数体外定义的变量。纯函数特性能够大幅度提升代码可读性和软件质量。
3.1 自定义函数
//定义一个函数返回工做日 def getWorkDays(dates){ return dates[def(x):weekday(x) between 1:5] } getWorkDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01) [2018.08.01, 2018.10.01]
上面的例子定义一个函数getWorkDays,该函数受一组日期,返回并返回在周一和周五之间的日期。函数的实现采用了向量的过滤功能,也就是接受一个布尔型单目函数用于数据的过滤。
3.2 高阶函数
下面的一个例子咱们使用三个高阶函数pivot、each和cross,干净利落的用三行代码,根据股票日内tick级别的报价数据,计算出两两之间的相关性。
//模拟生成10000000万个数据点(股票代码,交易时间和价格) n=10000000 syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n) time = 09:30:00.000 + rand(21600000, n) price = 500.0 + rand(500.0, n) //利用pivot函数生成透视表 priceMatrix = pivot(avg, price, time.minute(), syms) //each和ratios函数的配合使用,为每一个股票(矩阵的列)生成每分钟的回报序列 retMatrix = each(ratios, priceMatrix) - 1 //cross和corr函数的配合使用,计算股票两两之间的相关性 corrMatrix = cross(corr, retMatrix, retMatrix) AMZN FB GOOG IBM MSFT --------- --------- --------- --------- --------- AMZN|1 0.015181 -0.056245 0.005822 0.084104 FB |0.015181 1 -0.028113 0.034159 -0.117279 GOOG|-0.056245 -0.028113 1 -0.039278 -0.025165 IBM |0.005822 0.034159 -0.039278 1 -0.049922 MSFT|0.084104 -0.117279 -0.025165 -0.049922 1
3.3 部分应用
高阶函数中的函数参数一般对参数有限制,经过部分应用,能够确保参数符合要求。例如,给定一个向量 a = 12 14 18,计算与矩阵中的每一列的相关性。由于要计算矩阵的每一列的相关性,固然可使用高阶函数each。可是corr函数须要两个参数,而矩阵只提供其中的一个参数,另外一个参数必须事先给定,因此部分应用能够解决这个问题。固然咱们也能够用for语句来解决这个问题,但代码冗长而低效。
a = 12 14 18 m = matrix(5 6 7, 1 3 2, 8 7 11) //使用each和部分应用计算矩阵中的每一列与给定向量a的相关性 each(corr{a}, m) //使用for语句解决上面的问题 cols = m.columns() c = array(DOUBLE, cols) for(i in 0:cols) c[i] = corr(a, m[i])
部分应用的另外一个做用是使函数保持状态。例如,在流计算中,用户一般须要给定一个消息处理函数(message handler),接受一条新的信息,返回一个结果。可是咱们但愿消息处理函数返回的是迄今为止全部数的平均数。这个问题咱们能够经过部分应用来解决。
def cumavg(mutable stat, newNum){ stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1) stat[1] += 1 return stat[0] } msgHandler = cumavg{0.0 0.0} each(msgHandler, 1 2 3 4 5) [1,1.5,2,2.5,3]
4.SQL编程
DolphinDB的编程语言不只支持标准的SQL,还针对时间序列数据扩展了SQL的功能,如分组计算(context by)、数据透视(pivot by)、窗口函数、asof链接和窗口链接等,更便于分析时间序列数据。单纯的SQL引擎表达能力有限,很难知足更加复杂的数据分析和算法实现,影响开发效率。在DolphinDB中,脚本语言与SQL语言是彻底融合在一块儿的。
4.1 SQL与编程语言融合
//生成一个员工工资表 emp_wage = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage) //计算给定的一组员工的平均工资。员工列表存储在一个本地变量empIds中 empIds = 3 4 6 7 9 select avg(wage) from emp_wage where id in empIds group by id id avg_wage -- -------- 3 5500 4 6000 6 6000 7 5500 9 5500 //除计算平均工资外,同时显示员工的姓名。员工姓名使用一个字典empName来获取。 empName = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones) select empName[first(id)] as name, avg(wage) from emp_wage where id in empIds group by id id name avg_wage -- ------- -------- 3 Jerry 5500 4 Jessica 6000 6 Tim 6000 7 Henry 5500 9 Kevin 5500
上面的例子,SQL语句的where子句和select子句分别用到了上下文中定义的数组和字典,使得原本须要经过子查询和多表联结来解决的问题,经过简单的hash table解决了。若是SQL涉及到分布式数据库,这些上下文变量会自动序列化到须要的节点。这不只让代码看上去更简洁,有更好的可读性,并且提高了性能。在大数据分析中,不少数据表关联,即便SQL优化器作了不少优化,也不免带来性能问题。
4.2 context by——对面板数据的友好支持
DolphinDB提供了相似其余数据库系统的window function——context by。可是与window function相比,context by的语法更简洁,而且没有那么多限制,能够与select或update一块儿使用。
//按股票代码进行分组,计算每一个股票天天的回报。假设数据是时间顺序排列的。 update trades set ret = ratios(price) - 1.0 context by sym //按日期进行分组,计算天天每一个股票的ret降序排名。 select date, symbol, ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date //选择天天ret排名前10的股票 select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10
4.3 asof join和window join——对时序数据的友好支持
t1 = table(09:30m 09:31m 09:33m 09:34m as minute, 29.2 28.9 29.3 30.1 as price) t2 = table(09:30m 09:31m 09:34m 09:36m as minute, 51.2 52.4 51.9 52.8 as price) select * from aj(t1, t2, `minute) minute price t2_minute t2_price ------ ----- --------- -------- 09:30m 29.2 09:30m 51.2 09:31m 28.9 09:31m 52.4 09:33m 29.3 09:31m 52.4 09:34m 30.1 09:34m 51.9
上面的例子中,t2中没有与09:33m、09:34m对应的记录,asof join(aj)会分别取t2中在09:33m、09:34m以前最近时间对应的记录,即取t2中09:31m的记录。
p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month) s = table(1 2 1 2 1 2 as id, 2018.04M 2018.04M 2018.05M 2018.05M 2018.06M 2018.06M as month, 4500 5000 6000 5000 6000 4500 as wage) select * from wj(p, s, -3:-1,<avg(wage)>,`id`month) id month avg_wage -- -------- ----------- 1 2018.06M 5250 2 2018.07M 4833.333333 3 2018.07M
上面的例子说明了window join(wj)的用法。wj首先取表p第一行记录,即id=1,month=2018.06M。而后在表s中选择id=1而且month在(2018.06M-3)到(2018.06M-1),即2018.03M到2018.05M之间的记录来计算avg(wage)。所以avg_wage=(4500+6000)/2=5250。如此类推。
asof join和window join在金融分析领域有着普遍的应用。一个经典的应用是将交易表和报价表进行关联,计算个股交易成本。详情能够参考使用Window Join快速估计个股交易成本。
4.4 SQL其它扩展
为了知足大数据分析的要求,DolphinDB对SQL还作了不少扩展。好比,用户的自定义函数无需编译、打包或部署,便可在SQL中使用。又好比DolphinDB支持组合字段(Composite Column),能够将复杂分析函数的多个返回值输出到数据表的一行。
factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6 factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7 t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2) //在输出参数的同时,输出t统计值。使用自定义函数包装输出结果 def myols(y,x){ r=ols(y,x,true,2) return r.Coefficient.beta join r.RegressionStat.statistics[0] } select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id id alpha beta1 beta2 R2 -- --------- --------- --------- -------- 1 1.063991 -0.258685 0.732795 0.946056 2 6.886877 -0.148325 0.303584 0.992413 3 11.833867 0.272352 -0.065526 0.144837
5.远程过程调用编程
DolphinDB与其余系统相比,在远程过程调用(RPC)上的优点主要体如今两个方面:第一,在DolphinDB中,不管是自定义函数仍是内置函数,咱们均可以经过远程过程调用发送到其余节点上运行,而其余系统不能远程调用与自定义函数相关的函数。第二,DolphinDB的远程过程调用无需编译或者部署。系统会自动把相关函数定义和所需数据序列化到远程节点。数据科学家或数据分析师在编写与远程过程调用相关的函数时,不须要工程师配合编译和部署,能够直接在线使用,极大地提升了开发和分析效率。
下面的例子是使用remoteRun执行远程函数:
h = xdb("localhost", 8081) //在远程节点上执行一段脚本 remoteRun(h, "sum(1 3 5 7)") 16 //上述远程调用也能够简写成 h("sum(1 3 5 7)") 16 //在远程节点上执行一个在远程节点注册的函数 h("sum", 1 3 5 7) 16 //在远程系节点上执行本地的自定义函数 def mysum(x) : reduce(+, x) h(mysum, 1 3 5 7) 16 //在远程节点(localhost:8081)上建立一个共享表sales h("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales") //若是本地的自定义函数有依赖,依赖的自定义函数也会序列化到远程节点 defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d h(salesSum, "sales", 2018.07.02) 40
DolphinDB还提供了与分布式计算相关的函数。mr和imr分别用于开发基于map-reduce和迭代的map-reduce分布式算法。用户只须要指定分布式数据源和定制的核心函数,譬如map函数,reduce函数,final函数等。下面咱们先建立一个分布式表,添加一些模拟数据,而后演示开发计算中位数和线性回归的例子。
//模拟生成分布式表sample,用id分区 //y = 0.5 + 3x1 -0.5x2 n=10000000 x1 = pow(rand(1.0,n), 2) x2 = norm(3.0:1.0, n) y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0:1.0, n) t=table(rand(10, n) as id, y, x1, x2) login(`admin,"123456") db = database("dfs://testdb", VALUE, 0..9) db.createPartitionedTable(t, "sample", "id").append!(t)
利用自定义的map函数myOLSMap,内置的reudce函数加函数(+),自定义的final函数myOLSFinal,以及内置的map-reduce框架函数mr,快速构建了一个在分布式数据源上运行线性回归的函数myOLSEx。
def myOLSMap(table, yColName, xColNames){ x = matrix(take(1.0, table.rows()), table[xColNames]) xt = x.transpose(); return xt.dot(x), xt.dot(table[yColName]) } def myOLSFinal(result){ xtx = result[0] xty = result[1] return xtx.inv().dot(xty)[0] } def myOLSEx(ds, yColName, xColNames){ return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal) } //使用本身开发的分布式算法和分布式数据源计算线性回归系数 sample = loadTable("dfs://testdb", "sample") myOLSEx(sqlDS(<select * from sample>), `y, `x1`x2) [0.4991, 3.0001, -0.4996] //使用内置的函数ols和未分的数据计算线性回归的系数,获得相同的结果 ols(y, [x1,x2],true) [0.4991, 3.0001, -0.4996]
下面这个例子,咱们构造一个算法,在分布式数据源上计算一组数据的近似中位数。算法的基本原理是利用bucketCount函数,在每个节点上分别计算一组bucket内的数据个数,而后把各个节点上的数据累加。这样咱们能够找到中位数应该落在哪一个区间内。若是这个区间不够小,进一步细分这个区间,直到小于给定的精度要求。中位数的算法须要屡次迭代,咱们所以使用了迭代计算框架imr。
def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true) def medFinal(range, result){ x= result.cumsum() index = x.asof(x[1025]/2.0) ranges = range[1] - range[0] if(index == -1) return (range[0] - ranges*32):range[1] else if(index == 1024) return range[0]:(range[1] + ranges*32) else{ interval = ranges / 1024.0 startValue = range[0] + (index - 1) * interval return startValue : (startValue + interval) } } def medEx(ds, colName, range, precision){ termFunc = def(prev, cur): cur[1] - cur[0] <= precision return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg() } //使用本身开发的近似中位数算法,计算分布式数据的中位数。 sample = loadTable("dfs://testdb", "sample") medEx(sqlDS(<select y from sample>), `y, 0.0 : 1.0, 0.001) -0.052973 //使用内置的med函数计算未分区的数据的中位数。 med(y) -0.052947
6.元编程
DolphinDB支持使用元编程来动态建立表达式,如函数调用的表达式和SQL查询表达式。元编程的一个典型应用是定制报表。用户只须要输入数据表、字段名称和字段格式就能生成报表。具体实现以下:
//根据输入的数据表,字段名称和格式,以及过滤条件,动态生成SQL表达式并执行 def generateReport(tbl, colNames, colFormat, filter){ colCount = colNames.size() colDefs = array(ANY, colCount) for(i in 0:colCount){ if(colFormat[i] == "") colDefs[i] = sqlCol(colNames[i]) else colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]}) } return sql(colDefs, tbl, filter).eval() } //模拟生成一个100行的数据表 t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty) //输入过滤条件,字段和格式,定制报表。过滤条件使用了元编程。 generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >) id date price qty --- ---------- ----- ----- 001 01/02/2018 50.27 2,886 002 01/03/2018 30.85 1,331 003 01/04/2018 17.89 18 004 01/05/2018 51.00 6,439 096 04/07/2018 57.73 8,339 097 04/08/2018 47.16 2,425 098 04/09/2018 27.90 4,621 099 04/10/2018 31.55 7,644 100 04/11/2018 46.63 8,383
DolphinDB编程语言为数据分析而生,天生具有处理海量数据的能力,功能强大,简单易用。若是想要了解更多关于DolphinDB脚本,能够参考DolphinDB脚本语言的混合范式编程。
此外,还提供了多种编程API,如R、Python、Java、C#等,可以方便地与已有的应用集成。
Java API:dolphindb/api-java
Python 3 API:dolphindb/api-python3
Python 2.7 API:dolphindb/api-python
C# API:dolphindb/api-csharp
欢迎访问官网下载 DolphinDB database 试用版