摘要: MaxCompute做为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件,拥有强大的计算能力,随着集团内外大数据业务的不断扩展,新的数据使用场景也在不断产生。在这样的背景下,MaxCompute(ODPS)计算框架持续演化,而原来主要面对内部特殊格式数据的强大计算能力,也正在一步步的经过新增的非结构化数据处理框架,开放给不一样的外部数据。html
0. 前言java
MaxCompute做为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件,拥有强大的计算能力,随着集团内外大数据业务的不断扩展,新的数据使用场景也在不断产生。在这样的背景下,MaxCompute(ODPS)计算框架持续演化,而原来主要面对内部特殊格式数据的强大计算能力,也正在一步步的经过新增的非结构化数据处理框架,开放给不一样的外部数据。 咱们相信阿里巴巴集团的这种需求,也表明着业界大数据领域的最前沿实践和走向,具备至关的普适性。在以前咱们已经对MaxCompute 2.0新增的非结构化框架作过总体介绍,描述了在MaxCompute上如何处理存储在OSS上面的非结构化数据,侧重点在怎样从OSS读取各类非结构化数据并在MaxCompute上进行计算。 而一个完整数据链路,读取和计算处理以后,必然也会涉及到非结构化数据的 写出。 在这里咱们着重介绍一下从MaxCompute往OSS输出非结构化数据,并提供一个具体的在MaxCompute上进行图像处理的实例, 来展现从【OSS->MaxCompute->OSS】的整个数据链路闭环的实现。 至于对于KV NoSQL类型数据的输出,在对TableStore数据处理介绍 中已经有所介绍,这里就再也不重复。git
1. 使用前提和假设github
1.1 MaxCompute 2.0 功能算法
这里介绍的功能基于MaxCompute新一代的2.0计算框架,目前2.0计算框架已经全面上线,默认就可以使用。sql
另外本文中使用了MaxCompute 2.0新引进的一个BINARY类型,目前在使用BINARY类型时,还须要显性设置set odps.sql.type.system.odps2=true。express
1.2 网络连通性与访问权限安全
另外由于MaxCompute与OSS是两个分开的云计算,与云存储服务,因此在不一样的部署集群上的网络连通性有可能影响MaxCompute访问OSS的数据的可达性。 关于OSS的节点,实例,服务地址等概念,能够参见OSS相关介绍。 在MaxCompute公共云服务访问OSS存储,推荐使用OSS私网地址(即以-internal.aliyuncs.com结尾的host地址)。网络
此外须要指出的是,MaxCompute计算服务要访问TableStore数据须要有一个安全的受权通道。 在这个问题上,MaxCompute结合了阿里云的访问控制服务(RAM)和令牌服务(STS)来实现对数据的安全反问:并发
首先须要在RAM中受权MaxCompute访问OSS的权限。登陆RAM控制台,建立角色AliyunODPSDefaultRole,并将策略内容设置为:
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "odps.aliyuncs.com" ] } } ], "Version": "1" }
而后编辑该角色的受权策略,将权限AliyunODPSRolePolicy受权给该角色。
若是以为这些步骤太麻烦,还能够登陆阿里云帐号点击此处完成一键受权。
2. MaxCompute内置的OSS数据输出handler
2.1 建立External Table
MaxCompute非结构化数据框架但愿从根本上提供MaxCompute与各类数据的联通,这里的“各类数据”是两个维度上的:
1.各类存储介质,好比OSS
2.各类数据格式, 好比文本文件,视频,图像,音频,基因,气象等格式的数据
而数据的这两个维度的特征,都是经过EXTERNAL TABLE的概念来引入MaxCompute的计算体系的。 与读取OSS数据的使用方法相似,对OSS数据进行写操做,在如上打开安全受权通道后,也是先经过CREATE EXTERNAL TABLE语句建立出一个外部表,再经过标准MaxCompute SQL的INSERT INTO/OVERWRITE等语句来实现的,这里先用MaxCompute内置的TsvStorageHandler为例来讲明一下用法:
DROP TABLE IF EXISTS tpch_lineitem_tsv_external; CREATE EXTERNAL TABLE IF NOT EXISTS tpch_lineitem_tsv_external ( orderkey BIGINT, suppkey BIGINT, discount DOUBLE, tax DOUBLE, shipdate STRING, linestatus STRING, shipmode STRING, comment STRING ) STORED BY 'com.aliyun.odps.TsvStorageHandler' ----------------------------------------- (1) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/tsv_output_folder/'; --(2)
这个DDL语句创建了一个外部表tpch_lineitem_tsv_external,并将前面提到的两个维度的外部数据信息关联到这个外部表上。
1.数据存储介质: LOCATION 将一个OSS上的地址关联到外部表上,对这个外部表的进行读写操做都会反映到这个OSS地址上。
2.数据存储格式: StorageHandler用来代表对这些数据的读写操做方式,这里使用了MaxCompute内置的 com.aliyun.odps.TsvStorageHandler, 用户可使用这个由系统自带的实现来读取和写出TSV文件。 同时用户也能够经过MaxCompute的SDK来自定义StorageHandler, 这个将在后面的章节介绍。
其中OSS数据存储的具体地址的URI格式为:
LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'
最后还要提到的是,在上面的DDL语句中定义了外部表的Schema, 对于数据输出而言,这表示输出的数据格式将由这个Schema描述。 就TSV格式而言,这个schema描述比较直观容易理解; 而在用户自定义的输出数据格式上,这个schema与输出数据的联系则更松散一些,有着更大的自由度。 在后面介绍经过自定义StorageHandler/Outputer的时候会详细展开。
2.2 经过对External Table的 INSERT 操做实现TSV文本文件的写出
在将OSS数据经过External Table关联上后,对OSS文件的写出能够对External Table作标准的SQL INSERT OVERWRITE/INSERT INTO来操做。 具体输出数据的来源能够有两种
1.数据源为MaxCompute的内部表: 也就是说能够经过对外表INSERT操做来实现MaxCompute内部表数据到外部存储介质的写出。
2.数据源为以前经过External Table引入MaxCompute计算体系的外部数据: 这能够用来将外部数据引入MaxCompute进行计算,而后再存储到(不一样的)外部存储地址,或者甚至是不一样的外部存储介质(好比将TableStore数据经由MaxCompute导出到OSS)。
2.2.1 从MaxCompute内部表输出数据到OSS
这里先来看第一种场景:假设咱们已经有一个名为tpch_lineitem的MaxCompute内部表,其schema能够经过
DESCRIBE tpch_lineitem;
获得:
+------------------------------------------------------------------------------------+ | InternalTable: YES | Size: 241483831680 | +------------------------------------------------------------------------------------+ | Native Columns: | +------------------------------------------------------------------------------------+ | Field | Type | Label | Comment | +------------------------------------------------------------------------------------+ | l_orderkey | bigint | | | | l_partkey | bigint | | | | l_suppkey | bigint | | | | l_linenumber | bigint | | | | l_quantity | double | | | | l_extendedprice | double | | | | l_discount | double | | | | l_tax | double | | | | l_returnflag | string | | | | l_linestatus | string | | | | l_shipdate | string | | | | l_commitdate | string | | | | l_receiptdate | string | | | | l_shipinstruct | string | | | | l_shipmode | string | | | | l_comment | string | | | +------------------------------------------------------------------------------------+
其中有16个columns。 如今咱们但愿将其中的一部分数据以TSV格式导出到OSS上面。 那么在用上述DDL建立出External Table以后,使用以下INSERT OVERWRITE操做就能够实现:
INSERT OVERWRITE TABLE tpch_lineitem_tsv_external SELECT l_orderkey, l_suppkey, l_discount, l_tax, l_shipdate, l_linestatus, l_shipmode, l_comment FROM tpch_lineitem WHERE l_discount = 0.07 and l_tax = 0.01;
这里将从内部的tpch_lineitem表中,在符合l_discount = 0.07 并 l_tax = 0.01的行中选出8个列(对应tpch_lineitem_tsv_external这个外部表的schema)按照TSV的格式写到OSS上。 在上面这个INSERT OVERWRITE操做成功完成后,就能够看到OSS上的对应LOCATION产生了一系列文件:
osscmd ls oss://oss-odps-test/tsv_output_folder/ 2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv 2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv 2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv ...
这里能够看到,经过上面LOCATION指定的oss-odps-test这个OSS bucket下的tsv_output_folder文件夹下产生了一个.odps文件夹,这其中将有一些.tsv文件,以及一个.meta文件。 这样子的文件结构是MaxCompute(ODPS)往OSS上输出所特有的:
1.经过MaxCompute对一个OSS地址,使用INSERT INTO/OVERWRITE 外部表来作写出操做,全部的数据将在指定的LOCATION下的.odps文件夹产生;
2.其中.odps文件夹中的.meta文件为MaxCompute额外写出的宏数据文件,其中用于记录当前文件夹中有效的数据。 正常状况下,若是INSERT操做成功完成的话,能够认为当前文件夹的全部数据均是有效数据。 只有在有做业失败的状况下须要对这个宏数据进行解析。 即便是在做业中途失败或被kill的状况下,对于INSERT OVERWRITE操做,再跑一次成功便可。 若是对于高级用户,必定须要解析.meta文件的话,能够联系MaxCompute技术团队。
这里迅速看一下这些tsv文件的内容:
osscmd cat oss://oss-odps-test/tsv_output_folder/.odps/20170113232648738gam6csz7/M1_0_0-0.tsv 4236000067 9992377 0.07 0.01 1992-11-06 F RAIL across the ideas nag 4236000290 3272628 0.07 0.01 1998-04-28 O RAIL uriously. furiously unusual dinos int 4236000386 8081402 0.07 0.01 1994-02-19 F RAIL its. express, iron 4236000710 3879271 0.07 0.01 1995-03-10 F AIR es are carefully fluffily spe ...
能够看到确实在OSS上产生了对应的TSV数据文件。
最后,你们可能也注意到了,这个INSERT OVERWRITE操做产生了多个TSV文件,对于MaxCompute内置的TSV/CSV处理来讲,产生的文件数目与对应SQL stage的并发度是相同的,在上面这个例子中,INSER OVERWITE ... SELECT ... FROM ...; 的操做在源数据表(tpch_lineitem) 上分配了1000个mapper,因此最后产生了1000个TSV文件的。 若是须要控制TSV文件的数目,能够配合MaxCompute的各类灵活语义和配置来实现。 好比若是须要强制产生一个TSV文件,那在这个特定例子中,能够在INSER OVERWITE ... SELECT ... FROM ...最后加上一个DISTRIBUTE BY l_discount, 就能够在最后插入仅有一个Reducer的Reduce stage, 也就会只输出一个TSV文件了:
osscmd ls oss://oss-odps-test/tsv_output_folder/ 2017-01-14 08:03:41 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 08:03:35 4.20GB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113234037735gcm6csz7/R2_1_33_0-0.tsv
能够看到在增长了DISTRIBUTE BY l_discount后,如今一样的数据只了一个输出TSV文件,固然这个文件的size就大多了。 这方面的调控技巧还有不少,都是能够依赖SQL语言的灵活性,数据自己的特性,以及MaxCompute计算相关设置来实现的,这里就不深刻展开了。
2.2.2 以MaxCompute为计算介质,实现不一样存储介质之间的数据转移
External Table做为一个MaxCompute与外部存储介质的一个切入点,以前已经介绍过对OSS数据的读取以及TableStore数据的操做,结合对外部数据读取和写出的功能,就能够实现经过External Table实现各类各样的数据计算/存储链路,好比:
1.读取External Table A关联的OSS数据,在MaxCompute上作复杂计算处理,并输出到External Table B关联的OSS地址
2.读取External Table A关联的TableStore数据,在MaxCompute上作复杂计算处理,并输出到External Table B关联的OSS地址
而这些操做与上面数据源为MaxCompute内部表的场景, 惟一的区别只是SELECT的来源变成一个External table,而不是MaxCompute内置表。
3. 经过自定义StorageHandler来实现数据输出
除了使用内置的StorageHandler来实如今OSS上输出TSV/CSV等常见文本格式,MaxCompute非结构化框架提供了通用的SDK,容许用户对外输出自定义数据格式文件,包括图像,音频,视频等等。 这种对于用户自定义的彻底非结构化数据格式支持,也是MaxCompute从结构化/文本类数据的一个向外扩展,在这里咱们会以一个图像处理的例子,来走通整个【OSS->MaxCompute->OSS】数据链路,尤为着重介绍对OSS输出文件的功能。
为了方便你们理解,这里先提供一个在使用用户自定义代码的场景下,数据在MaxCompute计算平台上的流程:
从上图能够看出,从数据的流动和处理逻辑上理解,用户能够简单地把非结构化处理框架理解成在MaxCompute计算平台两端有机耦合的的数据导入(Ingres)以及导出(Egress):
1.外部的(OSS)数据通过非结构化框架转换,会使用Java用户容易理解的InputStream类提供给自定义代码接口。 用户自实现Extract逻辑只须要负责对输入的InputStream作读取/解析/转化/计算,最终返回MaxCompute计算平台通用的Record格式;
2.这些Record能够自由的参与MaxCompute的SQL逻辑运算,这一部分计算是基于MaxCompute内置的强大结构化SQL运算引擎,并可能产生新的Record
3.运算事后的Record中再传递给用户自定义的Output逻辑,用户在这里能够进行进一步的计算转换,并最终将Record里面须要输出的信息经过系统提供OutputStream输出,由系统负责写到OSS。
值得指出的是,这里面全部的步骤都是能够由用户根据须要来进行自由的选择与拼接的。 好比若是用户的输入就是MaxCompute的内部表,那步骤1.就没有必要了,事实上在前面的章节2中的例子,咱们就实现了将内部表直接写成OSS上的TSV文件的流程。 同理, 若是用户没有输出的需求,步骤3. 就没有必要,好比咱们以前介绍的OSS数据的读取。 最后,步骤2.也是能够省略的,好比若是用户的全部计算逻辑都是在自定义的Extract/Output中完成,没有进行SQL逻辑运算的须要,那步骤1.是能够直接链接到步骤3.的。
理解了上面这个数据变换的流程,咱们就能够来经过一个图像处理例子来看看怎么具体的经过非结构化框架在MaxCompute SQL上完整的实现非结构化数据的读取,计算以及输出了:
3.1 范例:OSS图像文件 -> MaxCompute计算处理 -> OSS图像输出
这里咱们先提供实现这整个【OSS->MaxCompute->OSS】数据链路须要用到的MaxCompute SQL query,并作简单的注解,详细的用户代码实现逻辑将在后面的3.2子章节中介绍SDK接口的时候作展开解释。
3.1.1 关联OSS上的原始输入图像到External Table: images_input_external
DROP TABLE IF EXISTS images_input_external; CREATE EXTERNAL TABLE IF NOT EXISTS images_input_external ( name STRING, width BIGINT, height BIGINT, image BINARY ) STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler' --- (1) WITH SERDEPROPERTIES ('inputImageFormat'='BMP' , 'transformedImageFormat' = 'JPG') --- (2) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SampleData/test_images/mixed_bmp/' --- (3) USING 'odps-udf-example.jar'; --- (4)
说明:
1.用户指明使用的用户代码wrapper class名字是com.aliyun.odps.udf.example.image.ImageStorageHandler,这个class及其依赖的三方库用户经过jar提供,具体jar名字会经过下面的USING语句(见第4点)指定。
2.经过SERDEPROPERTIES来实现参数传递,格式为'key'='value', 具体用法能够参见基本功能介绍 以及下面的用户代码说明
3.指定输入图像地址,这个地址上存放了一系列不一样分辨率的bmp图像文件。
4.指定包含用户JAR包,内含自定义的StorageHandler/Extractor/Outputer,以及须要的三方库(这里用到了Java ImageIO库,具体见下面用户代码范例)。JAR包经过ADD JAR命令上传,能够参见基本功能介绍。
另外要说明的是这里指定的External Table的schema就是用户在进行Extract操做后构造的Record格式,具体怎么构造这个Schema用户能够根据须要本身根据能从输入数据中抽取到的信息定义。 在这里咱们定义了对于输入图片数据,会将图片名称,图片的长和宽,以及图片的二进制bytes抽取出来放进Record(见后面的Extractor代码说明),因此就有了上面的【STRING,BIGINT,BIGINT,BINARY】的schema。
3.1.2 关联OSS输出地址到External Table: images_output_external
CREATE EXTERNAL TABLE IF NOT EXISTS images_output_external ( image_name STRING, image_width BIGINT, image_height BIGINT, outimage BINARY ) STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler' LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/output/images_output/' ---(1) USING 'odps-udf-example.jar';
说明: 能够看到这里建立关联输出图像文件的External Table,使用的DDL语句,与前面关联输入图像时使用的DDL语句是很是相似的:只是LOCATION不同,代表图像数据处理后将输出到另一个地址。 另外还有一点就是这里咱们没有使用SERDEPROPERTIES来进行传参,这个只是在这个场景上没有需求,在有需求的时候能够用一样的方法把参数传递给outputer。 固然这里两个DDL语句如此类似,有一个缘由是由于咱们这个例子中用户代码中对于Extract出的Record以及输入给Outputer的Record使用了同样的schema, 同时这一对Extractor和Outputer都被封装在了同一个ImageStorageHandler里放在同一个JAR包里。 在实际应用中,这些都是能够根据实际需求本身调整的,由用户本身选择组合和打包方式。
3.1.3 从OSS读取原始图片数据到MaxCompute, 计算处理,并输出图像到OSS
在上面的3.1.1以及3.1.2子章节中的两个DDL语句,分别实现了把输入OSS数据,以及计划输出OSS数据,分别绑定到两个LOCATION以及指定对应的用户处理代码,参数等设置。 然而这两个DDL语句对系统而言,只是进行了一些宏数据的记录操做,并不会涉及具体的数据计算操做。 在这两个DDL语句运行成功后,运行以下SQL语句才会引起真正的运算。 换句话说,在Fig.1中描述的整个【OSS->MaxCompute->OSS】数据读取/计算/输出链路,实际上都是经过下面一个简单的SQL 语句完成的:
INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width = 1024;
这看起来就是一个标准的MaxCompute SQL语句,只不过由于涉及了images_output_external和images_input_external这两个外部表,因此真正进行的物理操做与传统的SQL操做会有一些区别:在这个过程当中,涉及了读写OSS,以及经过ImageStorageHandler这个wrapper,调用自定义的Extractor,Outputer代码来对数据进行操做。 下面就来具体看看在这个例子中的用户自定义代码实现了怎样的功能,以及具体是如何实现的。
3.2 ImageStorageHandler实现
如同以前介绍过的,MaxCompute非结构化框架经过StorageHandler这个接口来描述对各类数据存储格式的处理。 具体来讲,StorageHandler做为一个wrapper class, 让用户指定本身定义的Exatractor(用于数据的读入,解析,处理等) 以及Outputer(用于数据的处理和输出等)。 用户自定义的StorageHandler 应该继承 OdpsStorageHandler,实现getExtractorClass以及getOutputerClass 两个接口。
一般做为wrapper class, StorageHandler的实现都很简单,好比这里的ImageStorageHandler 就只是经过这两个接口指定了咱们将使用ImageExtractor以及ImageOutputer:
package com.aliyun.odps.udf.example.image; public class ImageStorageHandler extends OdpsStorageHandler { @Override public Class<!--? extends Extractor--> getExtractorClass() { return ImageExtractor.class; } @Override public Class<!--? extends Outputer--> getOutputerClass() { return ImageOutputer.class; } }
另外要说明的是若是肯定在使用某个StorageHandler的时候,只须要用到Extractor,或者只须要用到Outputer功能,那不须要的接口则不用实现。 好比若是咱们只须要读取OSS数据而不须要作INSERT操做,那getOutputerClass()的实现只须要扔个NotImplemented exception就能够了,不会被调用到。
3.3 ImageExtractor实现
由于对于SDK中Extractor接口的介绍以及对用户如何写一个自定义的Extractor,在以前介绍的OSS数据的读取中已经有所涉及,因此这里就再也不对这方面作深刻的介绍。
Extractor的工做在于读取输入数据并进行用户自定义处理,那么咱们首先来看看这里由images_input_external这个外表绑定的OSS输入LOCATION上存放的具体数据内容:
osscmd ls oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/ 2017-01-09 14:02:01 1875.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/barbara.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/cameraman.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/fishingboat.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/goldhill.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/house.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/jetplane.bmp 2017-01-09 14:02:01 2.32MB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lake.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lena.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/livingroom.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/pirate.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/walkbridge.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_blonde.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_darkhair.bmp
能够看到这个LOCATION存放了一系列bmp图像数据,分辨率从 400 x 400 到 1200 x 1200不等。 具体在这个例子中用到的ImageExtractor的详细代码在github上能够找到, 这里只作一些简单介绍说明该Extractor作了些什么工做:
1.从输入的OSS地址上使用非结构化框架提供的InputStream接口读取图像数据,并在本地进行以下操做
对于图像宽度小于1024的图片,统一放大到1024 x 1024; 对于图像宽度大于1024的图片,跳过不进行处理
处理过的图片,在内存中转存成由输入参数指定的格式(JPG)
2.把处理后在内存中的JPG数据的原始字节存入输出的Record中的BINARY field, 同一个Record中还将存放处理后图像的长和宽(都是1024), 以及原始的图像名字(这个能够从输入的InputStream上获取);
3.填充后的Record从Extract接口返回进入MaxCompute系统;
4.在这个过程当中,用户能够灵活的进行各类操做,好比额外的参数验证等。
另外要说明的是,目前Record做为MaxCompute结构化数据处理的基本单元,有一些额外的限制,好比BINARY/STRING类型都有8MB大小的限制,可是在大部分场景下这个大小应该是能知足存储需求的。
3.4 ImageOutputer的实现
接下来咱们着重讲一下ImageOutputer的实现。 首先全部的用户输出逻辑都必须实现Outputer接口,具体来讲有以下三个:setup, output和close, 这和Extractor的setup, extract和close三个接口基本上是对称的。
// Base outputer class, custom outputer shall extend from this class public abstract class Outputer{ public abstract void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes); public abstract void output(Record record) throws IOException; public abstract void close() throws IOException; }
这其中setup()和close()在一个outputer中只会调用一次。 用户能够在setup里面作初始化准备工做,另外一般须要把setup()传递进来的这三个参数保存成ouputerd的class variable, 方便以后output()或者close()接口中使用。 而close()这个接口用于方便用户代码的扫尾工做。
一般状况下大部分的数据处理发生在output(Record)这个接口内。 MaxCompute系统会根据当前outputer分配处理的Record数目不断调用,也就是对每一个输入Record系统会调用一次 output(Record)。 系统假设在一个output(Record) 调用返回的时候,用户代码已经消费完这个Record, 所以在当前output(Record)返回后,系统可能将这个Record所使用的内存用做它用: 因此不推荐一个Record中的信息在跨多个output()函数调用被使用,若是必定有这个需求的话,用户必须把相关信息经过class variable等方式自行另外保存。
3.4.1 ImageOutputer.setup()
setup用于初始化整个outputer, 在这个接口上提供了整个outputer操做过程当中可能须要的参数:
在咱们这个ImageOutputer里,setup()的实现比较简单:
@Override public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) { this.outputStreamSet = outputStreamSet; this.attributes = attributes; this.attributes.verifySchema(new OdpsType[]{ OdpsType.STRING, OdpsType.BIGINT, OdpsType.BIGINT, OdpsType.BINARY }); }
只是作了简单的初始化以及对schema的验证。
3.4.2 ImageOutputer.output(Record) 以及 OutputStreamSet的使用
在介绍具体output()接口以前,首先咱们要来看看 OutputStreamSet, 这个类有两个接口:
public interface OutputStreamSet{ SinkOutputStream next(); SinkOutputStream next(String fileNamePostfix); }
两个接口都是用来获取一个新的SinkOutputStream(一个Java OutputStream的实现,能够按照OutputStream使用),两个接口惟一的区别是next()获取的OutputStream写出的文件名彻底由MaxCompute系统决定,而next(String fileNamePostfix)则容许用户提供文件名的postfix。 提供这个postfix的意义是,在输出文件具体地址和名字格式整体由MaxCompute系统决定的前提下,用户依然能够定制一个方便理解的postfix。 好比使用next("_boat.jpg") 获得的OutputStream可能对应以下一个输出文文件:
oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0_boat.jpg
这其中尾端的"_boat.jpg"能够帮助用户理解输出文件的涵义。 若是这个 OutputStream是由next()得到的话,那对应的输出文件可能就是这样的:
oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0
用户可能就须要具体读取这个文件才能知道这个文件中具体存放了什么内容。
前面提到output(Record)这个接口会由系统不断调用,可是应该强调的是,并不必定在每个Record都须要调用一次OutputStreamSet.next()接口来得到一个新的OutputStream。 事实上在大多数状况下,咱们建议在一个Outputer里面尽量减小调用next()的次数(最好只调用一次)。 也就是说理想状况下,一个outpuer只应该产生一个输出文件。 好比处理TSV这种文本格式文件,假设有5000个Record对应5000行TSV数据,那么最理想的状况是应该把这5000行数据所有写到一个TSV文件中。 固然用户可能会有各类各样不一样的切分输出文件的需求:好比但愿每一个文件大小控制在必定范围,或好比文件的边界有显著的意义等等。
具体到当前这个图像例子,从下面的ImageOutputer代码实现中能够看出,这个例子中确实是处理每一个Record就调用一次next()的,由于在当前场景中,每个输入的Record都表示一张图片的信息(binary bytes, 图像名字,图像长宽),因此这里经过屡次调用next()来输出多个图片文件。 可是咱们仍是须要再次强调,调用next()的次数过多可能有一些其余弊端,好比形成碎片化小数据在OSS上的存储等等。 尤为在MaxCompute这种分布式计算系统上,由于系统自己就会调度起多个outputer进行并行计算处理,若是每一个outpuer都输出过多文件的话,最后产生的文件数目会有一个乘性效应。 回头来看咱们这个例子中,即便在这里,多个图像其实也能够经过一个OutputStream,按照tar/tar.gz的方式写到单个文件中,这些都是在实现具体系统中用户须要根据本身的场景, 以及处理逻辑,输出数据类型等信息来进行优化和tradeoff的。
在理解了这些以后,如今来具体看看ImageOutputer的实现output接口实现:
@Override public void output(Record record) throws IOException { String name = record.getString(0); Long width = record.getBigint(1); Long height = record.getBigint(2); ByteArrayInputStream input = new ByteArrayInputStream(record.getBytes(3)); BufferedImage sobelEdgeImage = getEdgeImage(input); OutputStream outputStream = this.outputStreamSet.next(name + "_" + width + "x" + height + "." + outputFormat); ImageIO.write(sobelEdgeImage, this.outputFormat, outputStream); }
能够看到这里主要就作了三件事情:
1. 根据以前保存的图像名字,长宽信息,和编码方式(".jpg")拼出一个带扩展名的输出文件名postfix。
2. 读取图像binary bytes,并用getEdgeImage()来利用sobel算子对图像作边缘检测。
具体getEdgeImage()的实现这里就不进行深刻解释了: 使用了标准的sobel模板卷积算法, 有兴趣看ImageOutputer源码便可。
3. 对每个图像产生一个新的OutputStream并将数据写出,至此当前Record处理完毕,写出一张图片到OSS,output()函数返回。
3.4.3 ImageOutputer.close()
在这个例子中,outputer.close()接口没有包含具体的实现逻辑,是个no-op。
至此咱们就介绍完了一个output的实现,如今能够看看在运行完这个SQL query,对应OSS地址的数据:
osscmd ls oss://oss-odps-test/dev/output/images_output/ 2017-01-15 14:36:50 215.19KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0-barbara_1024x1024.jpg 2017-01-15 14:36:50 108.90KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-1-cameraman_1024x1024.jpg 2017-01-15 14:36:50 169.54KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-2-fishingboat_1024x1024.jpg 2017-01-15 14:36:50 214.94KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-3-goldhill_1024x1024.jpg 2017-01-15 14:36:50 71.00KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-4-house_1024x1024.jpg 2017-01-15 14:36:50 126.50KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-5-jetplane_1024x1024.jpg 2017-01-15 14:36:50 169.63KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-6-lake_1024x1024.jpg 2017-01-15 14:36:50 194.18KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-7-lena_1024x1024.jpg ...
能够看到图像数据按照期待格式写到了指定地址,这里咱们就选一个输入图像(lena.bmp)以及对应的输出图像(M1_0_-1--1-7-lena_1024x1024.jpg)看一下对比:
这个例子中整个图像处理流程已经经过如上的SQL query完成。 而从上面展现的ImageExtractor以及ImageOutputer 源代码,咱们能够看出整个过程当中用户的逻辑基本与写单机图像处理程序无异,用户的代码只须要在Extractor上作InputStream到Record的准换,而在Outputer上作反向的Record到OutputSteam的写出处理,其余核心的处理逻辑实现基本和单机算法实现相同,在用户的层面,并不用去操心底层分布式系统的细节以及MaxCompute和OSS的交互。
3.5 数据处理步骤的灵活性
从上面这个例子中咱们也能够看出,在一个完整的【OSS->MaxCompute->OSS】数据流程中,Extractor和Outputer中涉及的具体计算逻辑其实也并不必定会有一个很是明确的边界。 Extractor和Outputer只要各自完成所需的转换Record/Stream的转换,具体的额外算法逻辑在两个地方都有机会完成。 好比上面这个例子的整个流程涉及了以下图像处理相关的运算:
1.图像的缩放 (统一到 1024 x 1024)
2.图像格式的转换 (BMP -> JPG)
3.图像的Sobel边缘检测
上面的例子实现中,把1. 和 2. 放在ImageExtractor中完成,而3.则放在ImageOutputer中完成,但并非惟一的选择。 咱们彻底能够把全部3个步骤都放在ImageExtractor中完成,让ImageOutputer只作Record到写出最后图像的操做;也能够在ImageExtractor中只作读取原始binary到Recrod, 而把全部3个图像处理步骤都放在ImageOutputer中进行,等等。 具体进行怎样的选择,用户能够彻底根据须要本身实现。
另一个系统设计的点是若是对于一个数据须要作重复的运算,那能够考虑将数据从OSS中经过Extractor读出进MaxCompute,而后存储成MaxCompute的内置表格再进行(屡次)的计算。 这个对于MaxCompute和OSS没有进行混布,不在一个物理网络上的场景尤为有意义: MaxCompute从内置表中读取数据无疑要比从外部OSS存储服务中读出数据要有效得多。 在上面3.1.3子章节中的图像处理例子,这个INSER OVERWITE操做:
INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width = 1024;
就能够改写成两个分开的语句:
INSERT OVERWRITE TABLE images_internal SELECT * FROM images_input_external WHERE width = 1024; INSERT OVERWRITE TABLE images_output_external SELECT * FROM image_internal;
经过把数据写到一个内部images_internal表中,后面若是有屡次读取数据的需求的话,就能够再也不去访问外部OSS了。 这里也能够看到MaxCompute非结构化框架以及SQL语法自己提供了很是高的灵活性和可扩展性,用户能够根据实际计算的不一样模式/场景/需求,来在上面完成各类各样的数据计算工做流。
非结构化数据处理框架随着MaxCompute 2.0一块儿推出,意在丰富MaxCompute平台的数据处理生态,来打通阿里云核心计算平台与阿里云各个重要存储服务之间的数据链路。 在以前介绍过的读取OSS以及处理TableStore数据的总体方案后,本文侧重介绍数据往OSS的输出方案,并依托一个图像处理的处理实例,展现了【OSS->MaxCompute->OSS】整个数据链路的实现。 在这些新功能的基础上,咱们但愿实现整个阿里云计算与数据的生态融合: 在不一样的项目上,咱们已经看到了在MaxCompute上处理OSS上的海量视频,图像等非结构化数据的巨大潜力。 从此随着这个生态的丰富,咱们指望OSS数据,TableStore数据以及MaxCompute内部存储的数据,都能在MaxCompute的核心计算引擎上进行融合,从而产生更大的价值。
阅读更多干货好文,请关注扫描如下二维码: