MaxCompute做为使用最普遍的大数据平台,内部存储的数据以EB量级计算。巨大的数据存储量以及大规模计算下高性能数据读写的需求,对于MaxCompute提出了各类高要求及挑战。处在大数据时代,数据的来源多种多样,开源社区通过十几年的发展,百花齐放,各类各样的数据格式不断的出现。 咱们的用户也在各个场景上,经过各类计算框架,积累了各类不一样格式的数据。怎样将MaxCompute强大的计算能力开放给这些使用开源格式存储沉淀下来的数据,在MaxCompute上挖掘这些数据中的信息,是MaxCompute团队但愿解决的问题。java
MaxCompute 2.0最近推出的非结构化计算框架【公测阶段】,旨在从存储介质和存储格式两个维度,打通计算与存储的通道。 在以前的文章中,咱们已经介绍过怎样在MaxCompute上对存储在OSS上的文本,音频,图像等格式的数据,以及TableStore(OTS)的KV数据进行计算处理。在这里,则将介绍对于各类流行的开源数据格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎样将其存储在OSS上面,并经过非结构化框架在MaxCompute进行处理。sql
本着不重造轮子的原则,对于绝大部分这些开源数据格式的解析工做,在非结构化框架中会直接调用开源社区的实现,而且无缝的与MaxCompute系统作对接。apache
MaxCompute非结构化数据框架经过EXTERNAL TABLE的概念来提供MaxCompute与各类数据的联通,与读取OSS数据的使用方法相似,对OSS数据进行写操做,首先要经过CREATE EXTERNAL TABLE语句建立出一个外部表,而在读取开源数据格式时,建立外表的DDL语句格式以下:安全
DROP TABLE [IF EXISTS] <external_table>; CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table> (<column schemas>) [PARTITIONED BY (partition column schemas)] [ROW FORMAT SERDE '<serde class>'] STORED AS <file format> LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
能够看到,这个语法与HIVE的语法是至关接近的,而在这个CREATE EXTERNAL TABLE的ddl语句中,有以下几点要说明:性能优化
STORED AS
的关键字,而不是普通非结构化外表用的STORED BY
关键字,这也是目前在读取开源兼容数据时独有的。<column schemas>
必须与具体OSS上存储存储数据的schema相符合。ROW FORMAT SERDE
并不是必选选项,只有在使用一些特殊的格式上,好比TEXTFILE时才须要使用。STORED AS
后面接的是文件格式名字, 好比 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。如今再来看一个具体的例子,假设咱们有一些PARQUET文件存放在一个OSS路径上,每一个文件都是PARQUET格式,存放着schema为16列(4列BINGINT, 4列DOUBLE, 8列STRING)的数据,那么能够经过以下DDL语句来描述:app
CREATE EXTERNAL TABLE tpch_lineitem_parquet ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) STORED AS PARQUET LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
一样的数据,若是是每行以JSON格式,存储成OSS上TEXTFILE文件;同时,数据在OSS经过多个目录组织,这时是可使用MaxCompute分区表和数据关联,则能够经过以下DDL语句来描述:框架
CREATE EXTERNAL TABLE tpch_lineitem_textfile ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) PARTITIONED BY (ds string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
若是OSS表目录下面的子目录是以Partition Name方式组织,好比:性能
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/' oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/' ...
则可使用如下DDL语句ADD PARTITION:大数据
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102"); ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
若是OSS分区目录不是按这种方式组织,或者根本不在表目录下,好比:优化
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/; oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/; ...
则可使用如下DDL语句ADD PARTITION:
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/'; ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/'; ...
对比上面的两个范例,能够看出对于不一样文件类型,只要简单修改STORED AS
后的格式名。在接下来的例子中,咱们将只集中描述对上面PARQUET数据对应的外表(tpch_lineitem_parquet)的处理,若是要处理不一样的文件类型,只要在DDL建立外表时指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE便可,处理数据的语句则是同样的。
在建立数据外表后,直接对外表就能够进行与普通MaxCompute表的操做,直接对存储在OSS上的数据进行处理,好比:
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_parquet WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
能够看到,在这里tpch_lineitem_parquet
这个外表被看成一个普通的内部表同样使用。惟一不一样的只是在MaxCompute内部计算引擎将从OSS上去读取对应的PARQUET数据来进行处理。
可是咱们应该强调的是,在这里直接使用外表,每次读取的时候都须要涉及外部OSS的IO操做,而且MaxCompute系统自己针对内部存储作的许多高性能优化都用不上了,因此性能上会有所损失。 因此若是是须要对数据进行反复计算以及对计算的高效性比较敏感的场景上,咱们推荐下面这种用法:先将数据导入MaxCompute内部,再进行计算。
注意,上面例子中的tpch_lineitem_textfile表,由于使用了ROW FORMAT + STORED AS,须要手动设置flag(只使用STORED AS,odps.sql.hive.compatible默认为TRUE),再进行读取,不然会有报错。
SELECT * FROM tpch_lineitem_textfile LIMIT 1; FAILED: ODPS-0123131:User defined function exception - Traceback: com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper --须要手动设置hive兼容flag set odps.sql.hive.compatible=true; SELECT * FROM tpch_lineitem_textfile LIMIT 1; +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
tpch_lineitem_internal
,而后将OSS上的开源数据导入MaxCompute内部表,以cFile格式存储在MaxCompute内部:CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet; INSERT OVERWRITE TABLE tpch_lineitem_internal SELECT * FROM tpch_lineitem_parquet;
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_internal WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
经过这样子将数据先导入系统的状况下,对一样数据的计算就会更高效得多。
开源的种种数据格式每每由各类数据处理生态产生,而MaxCompute非结构化数据处理框架经过实现计算与存储的互联,但愿打通阿里云核心计算平台与各类数据的通路。在这个基础上,各类各样依赖于不一样数据格式的应用,将能在MaxCompute计算平台上实现,后继咱们会对一些具体的这种应用,好比基因计算等,再作一些具体的case study以及介绍。咱们也欢迎有对开源数据进行处理分析的更多应用,能在MaxCompute强大计算能力的基础上开花结果。
本文为云栖社区原创内容,未经容许不得转载。