摘要: 1. 本文背景 不少行业的信息系统中,例如金融行业的信息系统,至关多的数据交互工做是经过传统的文本文件进行交互的。此外,不少系统的业务日志和系统日志因为各类缘由并无进入ELK之类的日志分析系统,也是以文本文件的形式存在的。java
不少行业的信息系统中,例如金融行业的信息系统,至关多的数据交互工做是经过传统的文本文件进行交互的。此外,不少系统的业务日志和系统日志因为各类缘由并无进入ELK之类的日志分析系统,也是以文本文件的形式存在的。随着数据量的指数级增加,对超大文本文件的分析愈来愈成为挑战。好在阿里云的MaxCompute产品从2.0版本开始正式支持了直接读取并分析存储在OSS上的文本文件,能够用结构化查询的方式去分析非结构化的数据。正则表达式
本文对使用MaxCompute分析OSS文本数据的实践过程当中遇到的一些问题和优化经验进行了总结。做为前提,读者须要详细了解MaxCompute读取OSS文本数据的一些基础知识,对这篇官方文档 《访问 OSS 非结构化数据》最好有过实践经验。本文所描述的内容主要是针对这个文档中提到的自定义Extractor作出的一些适配和优化。sql
2.1 场景一:分析zip压缩后的文本文件
场景说明
不少时候咱们会对历史的文本数据进行压缩,而后上传到OSS上进行归档,那么若是要对这部分数据导入MaxCompute进行离线分析,咱们能够自定义Extractor让MaxCompute直接读取OSS上的归档文件,避免了把归档文件下载到本地、解压缩、再上传回OSS这样冗长的链路。app
实现思路
如 《访问 OSS 非结构化数据》文档中所述,MaxCompute读取OSS上的文本数据本质上是读取一个InputStream流,那么咱们只要构造出适当的归档字节流,就能够直接获取这个InputStream中的数据了。ide
以Zip格式的归档文件为例,咱们能够参考 DataX 中关于读取OSS上Zip文件的源码,构造一个Zip格式的InputStream,代码见 ZipCycleInputStream.java 。构造出这个Zip格式的InputStream后,在自定义Extractor中获取文件流的部分就能够直接使用了,例如:优化
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = inputs.next(); // ...... ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); // ...... }
优化经验
你们可能知道,MaxCompute中进行批量计算的时候,能够经过设置 odps.stage.mapper.split.size 这个参数来调整数据分片的大小,从而影响到执行计算任务的Mapper的个数,在必定程度上提升Mapper的个数能够增长计算的并行度,进而提升计算效率 (但也不是说Mapper个数越多越好,由于这样可能会形成较长时间的资源等待,或者可能会形成长尾的后续Reducer任务,反而下降总体的计算效率) 。this
一样道理,对OSS上的文本文件进行解析的时候,也能够经过设置 odps.sql.unstructured.data.split.size 这个参数来达到调整Mapper个数的目的 (注意这个参数可能须要提工单开通使用权限):阿里云
set odps.sql.unstructured.data.split.size=16;
上述设定的含义是,将OSS上的文件拆分为若干个16M左右大小的分片,让MaxCompute尽力作到每一个分片启动一个Mapper任务进行计算——之因此说是“尽力作到”,是由于MaxCompute默认不会对单个文件进行拆分及分片处理(除非设定了其余参数,咱们后面会讲到),也就是说,若是把单个分片按照上面的设定为16M,而OSS上某个文件大小假设为32M,则MaxCompute仍然会把这个文件总体(即32M)的数据量做为一个分片进行Mapper任务计算。调试
注意点
咱们在这个场景中处理的是压缩后的文件,而InputStream处理的字节量大小是不会因压缩而变小的。举个例子,假设压缩比为1:10,则上述这个32M的压缩文件实际表明了320M的数据量,即MaxCompute会把1个Mapper任务分配给这320M的数据量进行处理;同理假设压缩比为1:20,则MaxCompute会把1个Mapper任务分配给640M的数据量进行处理,这样就会较大的影响计算效率。所以,咱们须要根据实际状况调整分片参数的大小,并尽可能把OSS上的压缩文件大小控制在一个比较小的范围内,从而能够灵活配置分片参数,不然分片参数的值会由于文件太大而且文件不会被拆分而失效。日志
2.2 场景二:过滤文本文件中的特定行
场景说明
对于一些业务数据文件,特别是金融行业的数据交换文件,一般会有文件头或文件尾的设定要求,即文件头部的若干行数据是一些元数据信息,真正要分析的业务数据须要把这些元信息的行过滤掉,只分析业务数据部分的行,不然执行结构化查询的SQL语句的时候必然会形成任务失败。
实现思路
在 《访问 OSS 非结构化数据》文档中提到的 代码示例 中,对 readNextLine() 方法进行一些改造,对读取的每个文件,即每一个 currentReader 读取下一行的时候,记录下来当前处理的行数,用这个行数判断是否到达了业务数据行,若是未到业务数据行,则继续读取下一条记录,若是已经到达数据行,则将该行内容返回处理;而当跳转到下一个文件的时候,将 该行数值重置。
代码示例:
private String readNextLine() throws IOException { if (firstRead) { firstRead = false; currentReader = moveToNextStream(); if (currentReader == null) { return null; } } // 读取行级数据 while (currentReader != null) { String line = currentReader.readLine(); if (line != null) { if (currentLine < dataLineStart) { // 若当前行小于数据起始行,则继续读取下一条记录 currentLine++; continue; } if (!"EOF".equals(line)) { // 若未到达文件尾则将该行内容返回,若到达文件尾则直接跳到下个文件 return line; } } currentReader = moveToNextStream(); currentLine = 1; } return null; }
此处 dataLineStart 表示业务数据的起始行,能够经过 DataAttributes 在创建外部表的时候从外部做为参数传入。固然也能够随便定义其余逻辑来过滤掉特定行,好比本例中的对文件尾的“EOF”行进行了简单的丢弃处理。
2.3 场景三:忽略文本中的空行
场景说明
在 《访问 OSS 非结构化数据》文档中提到的 代码示例 中,已能够应对大多数场景下的文本数据处理,但有时候在业务数据文本中会存在一些空行,这些空行可能会形成程序的误判,所以咱们须要忽略掉这些空行,让程序继续分析处理后面有内容的行。
实现思路
相似于上述 场景二 ,只须要判断为空行后,让程序继续读取下一行文本便可。
代码示例:
public Record extract() throws IOException {
String line = readNextLine(); if (line == null) { return null;// 返回null标志已经读取完成 } while ("".equals(line.trim()) || line.length() == 0 || line.charAt(0) == '\r' // 遇到空行则继续处理 || line.charAt(0) == '\n') { line = readNextLine(); if (line == null) return null; } return textLineToRecord(line);
}
2.4 场景四:选择OSS上文件夹下的部分文件进行处理
场景说明
阅读 《访问 OSS 非结构化数据》文档可知,一张MaxCompute的外部表链接的是OSS上的一个文件夹(严格来讲OSS没有“文件夹”这个概念,全部对象都是以Object来存储的,所谓的文件夹其实就是在OSS建立的一个字节数为0且名称以“/”结尾的对象。MaxCompute创建外部表时链接的是OSS上这样的以“/”结尾的对象,即链接一个“文件夹”),在处理外部表时,默认会对该文件夹下 全部的文件 进行解析处理。该文件夹下全部的文件集合即被封装为 InputStreamSet ,而后经过其 next() 方法来依次得到每个InputStream流、即每一个文件流。
但有时咱们可能会但愿只处理OSS上文件夹下的 部分 文件,而不是所有文件,例如只分析那些文件名中含有“2018_”字样的文件,表示只分析2018年以来的业务数据文件。
实现思路
在获取到每个InputStream的时候,经过 SourceInputStream 类的 getFileName() 方法获取正在处理的文件流所表明的文件名,而后能够经过正则表达式等方式判断该文件流是否为所须要处理的文件,若是不是则继续调用 next() 方法来获取下一个文件流。
代码示例:
private BufferedReader moveToNextStream() throws IOException { SourceInputStream stream = null; while ((stream = inputs.next()) != null) { String fileName = stream.getFileName(); System.out.println("========inputs.next():" + fileName + "========"); if (patternModel.matcher(fileName).matches()) { System.out.println(String .format("- match fileName:[%s], pattern:[%s]", fileName, patternModel .pattern())); ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream); return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192); } else { System.out.println(String.format( "-- discard fileName:[%s], pattern:[%s]", fileName, patternModel.pattern())); continue; } } return null; }
本例中的 patternModel 为经过 DataAttributes 在创建外部表的时候从外部做为参数传入的正则规则。
写到这里可能有读者会问,若是一个文件夹下有不少文件,好比上万个文件,整个遍历一遍后只选择一小部分文件进行处理这样的方式会不会效率过低了?其实大可没必要担忧,由于相对于MaxCompute对外部表执行批量计算的过程,循环遍历文件流的时间消耗是很是小的,一般状况下是不会影响批量计算任务的。
2.5 场景五:针对单个大文件进行拆分
场景说明
在 场景一 中提到,要想提升计算效率,咱们须要调整 odps.sql.unstructured.data.split.size 参数值来增长Mapper的并行度,可是对于单个大文件来说,MaxCompute默认是不进行拆分的,也就是说OSS上的单个大文件只会被分配给一个Mapper任务进行处理,若是这个文件很是大的话,处理效率将会及其低下,咱们须要一种方式来实现对单个文件进行拆分,使其能够被多个Mapper任务进行并行处理。
实现思路
仍然是要依靠调整 odps.sql.unstructured.data.split.size 参数来增长Mapper的并行度,而且设定 odps.sql.unstructured.data.single.file.split.enabled 参数来容许拆分单个文件 (同odps.sql.unstructured.data.split.size,该参数也可能须要提工单申请使用权限) ,例如:
set odps.sql.unstructured.data.split.size=128; set odps.sql.unstructured.data.single.file.split.enabled=true;
设置好这些参数后,就须要编写特定的Reader类来进行单个大文件的拆分了。
核心的思路是,根据 odps.sql.unstructured.data.split.size 所设定的值,大概将文件按照这个大小拆分开,可是拆分点极大可能会切在一条记录的中间,这时就须要调整字节数,向前或向后寻找换行符,来保证最终的切分点落在一整条记录的尾部。具体的实现细节相对来说比较复杂,能够参考在 《访问 OSS 非结构化数据》文档中提到的 代码示例 来进行分析。
注意点
在计算字节数的过程当中,可能会遇到非英文字符形成计算切分点的位置计算不许确,进而出现读取的字节流仍然没有把一整行覆盖到的状况。这须要针对含有非英文字符的文本数据作一些特殊处理。
代码示例:
@Override public int read(char[] cbuf, int off, int len) throws IOException { if (this.splitReadLen >= this.splitSize) { return -1; } if (this.splitReadLen + len >= this.splitSize) { len = (int) (this.splitSize - this.splitReadLen); } int readSize = this.internalReader.read(cbuf, off, len); int totalBytes = 0; for (char ch : cbuf) { String str = String.valueOf(ch); byte[] bytes = str.getBytes(charset); totalBytes += bytes.length; } this.splitReadLen += totalBytes; return readSize; }
在编写自定义Extractor的程序中,适当加入System.out做为日志信息输出,这些日志信息会在MaxCompute执行时输出在LogView的视图中,对于调试过程和线上问题排查过程很是有帮助。
上文中提到经过调整 odps.sql.unstructured.data.split.size 参数值来适当提升Mapper任务的并行度,可是并行度并非越高越好,具体什么值最合适是与OSS上的文件大小、总数据量、MaxCompute产品自身的集群状态紧密联系在一块儿的,须要屡次调试,而且可能须要与 odps.stage.reducer.num、odps.sql.reshuffle.dynamicpt、odps.merge.smallfile.filesize.threshold 等参数配合使用才能找到最优值。而且因为MaxCompute产品自身的集群状态也是很重要的因素,可能今天申请500个Mapper资源是很容易的事情,过几个月就变成常常须要等待很长时间才能申请到,这就须要持续关注任务的执行时间并及时调整参数设定。
外部表的读取和解析是依靠Extractor对文本的解析来实现的,所以在执行效率上是远不能和MaxCompute的普通表相比的,因此在须要频繁读取和分析OSS上的文本文件的状况下,建议将OSS文件先 INSERT OVERWRITE 到MaxCompute中字段彻底对等的一张普通表中,而后针对普通表进行分析计算,这样一般会得到更好的计算效率。