一种优化Kettle转换性能的通用方法

一、问题背景

转换和作业是Kettle的两支利剑。其中,转换采用多线程并发运行架构,作业采用递归顺序执行架构。因此,在增加CPU、内存等硬件资源的情况下,Kettle转换只需修改配置,即可充分利用基础设施能力,提高执行效率。转换的这种垂直扩展能力,使其成为实现ETL功能的必备工具,也是性能优化的核心。

当然,性能优化是一个系统工程,不仅涉及工具本身的优化,更涉及到ETL工具之外的诸多因素。比如,ETL要读取数据库,那么目标DMBS的性能、SQL语句、网络等相关因素都影响到执行效率。Kettle的中文意思为水壶,意即通过这把“水壶”,把上游产生的“水”(数据)顺利输送到下游各部门。想要让整个输送过程实现效率最高,取决于三个因素:上游来水渠道、水壶的大小与数量、下游输水渠道。

本文在假设第一、第三个因素已经达到最优的前提下,讨论第二个因素如何优化。即在外部环境因素不变、内部转换不变时,如何定位到性能瓶颈步骤,并通过垂直扩展进行有针对性、实效性的优化。文章最后还简要介绍了自定义优化插件的编程思路。


二、工具局限

Kettle工具本身具备性能监测手段。设计阶段常用的Spoon工具中,转换执行之后,窗口底部执行结果中有一个步骤度量标签页(如下图1所示)。

图1

                           

从步骤度量中,可以看到每个步骤的执行时间以及速度。在生产环境中,通过Pan或者Kitchen的执行日志,也能够看到读、写行数和速度。那么,是不是速度慢的步骤就是性能瓶颈?或者执行时间最长的就是卡的步骤呢?下面通过实例来分析。

图1中的转换,共包括42个步骤。在测试机器上,通过Spoon中调试执行共需3分4秒,各步骤的性能度量结果图2所示。

从图2中可以看出,执行时间长、执行速度慢的步骤都集中在黄色区域,即步骤C01~C15和D00~D07,每个步骤的执行时间大概是3分1秒。那么,性能瓶颈是否就在这些步骤中呢?

答案是非也!

通过目测Spoon调测窗口(详见图1中虚线框步骤)的运行状态,我们可以看到,C01前的步骤几乎都在5秒内完成,而C01后的步骤,由于都在等C01的输出,所以执行时间随之变长,执行速度几乎都在1条记录/秒。因此,本转换的性能瓶颈在步骤C01,跟其他步骤无关(当然这种目测方法不适合用于生产环境)。

图2

 

三、解决办法

1、原理探究

导致这种误判现象背后的原因究竟是什么呢?

我们分析发现,Kettle列出的每个步骤的执行时间,是通过步骤完成时间减去步骤开始时间而得出的时间间隔(参照图3中org.pentaho.di.trans.step.BaseStep源码)。因此,即使步骤没有做具体的事情,只是在等待输入行,但由于步骤并没有执行完毕,所以等待时间也包含在执行时间中。这是导致误判的根本原因。

图3

通过JDK工具JavaVirtualVM监测各线程执行状态,也可以证实这一点。图4为转换执行过程中的一个截图。可以看出,Kettle为每一个转换都至少启动了一个线程,线程的命名规则为:

转换名称 – 步骤名称

本例转换名称为KettleSample004,所以步骤C01所在线程名称就为“KettleSample004– C01”。图4中绿色部分代表线程处在工作状态,橙色部分代表等待执行状态。截图时刻(15:51:4015:52:00),步骤C01一直处在工作状态,而C02~C15基本上都处在等待状态(只有C02存在小段工作时间)。

图4

因此,解决问题的关键在于,需要一个程序来统计步骤的实际工作时间,实际工作时间长的步骤,可以判定为转换性能瓶颈。图5为在另外一台测试机器上,运行同样转换得到的实际工作时间数据(去掉了一些时间非常短的步骤),左侧为各步骤实际工作时间表,右侧为时间数据散点图。由于C01实际工作时间与其他步骤相比悬殊太大,为了更容易分类,右侧下半部分绘制了排除C01时间数据之外的散点图。从图5中的数据可以明显看出转换的性能瓶颈步骤,也可以根据散点图按照各步骤执行时间分为几种不同类别,分类进行优化。

图5


2、手工解决办法

第一步:分组。

根据散点图的分析结果,我们可以按照实际工作时间的长短,将步骤分为5个组(如下表1所示)。实际工作时间在100-400毫秒之间的步骤归入#1组;实际工作时间在2100~6800毫秒之间的步骤归入#2组;实际工作时间在7800~12000毫秒之间的步骤归入#3组;实际工作时间最高的C01单独归入#4组;其他步骤归入#5组。

表1

第二步:定义命名参数。

在Spoon中双击转换,在转换属性对话框中添加4个命名参数,并设置默认值。表1中的其他步骤分组#5无需单独定义参数,因为这些步骤的复制数量可以保持默认值1不变。具体设置如图6所示。

图6

第三步:设置步骤。

根据表1的分组结果,设置每一个步骤的复制数量,以利用Kettle的垂直扩展能力。具体操作是在每一个步骤上点击右键,选择“改变开始复制的数量…”菜单,并按住Ctrl+Alt+空格键,在下拉框中选择对应分组的命名参数即可。注意,某些步骤可能不允许改变复制数量。如果这些步骤对性能调优非常重要,我们可以在步骤前加一个空操作步骤,再进行设置;如果不重要,忽略即可。配置后的结果如下图7所示:

图7

第四步:运行调优。

如果我们单纯把线程数量无限增大,由于创建线程、线程上下文切换等其他资源的消耗,可能带来执行速度不升反降。因此,不能简单地按照实际工作时间比例来调整复制数量。例如#4组的实际工作时间是#3组的40倍以上。按此比例,#3的值如果设置为8,那么#4的值应该设置为320,这显然不是一个合理的设置。所以在环境不变的情况下,如何得到最优的复制数量,是一个有挑战性的问题。本文仅讨论依据经验通过不同设置的实际执行效果,来找到局部最优解的方法。其他方法将在另外的文章中进行讨论

通过Pan命令,可以设置不同的参数来执行同样的任务,通过整体运行时间,找到一个较好的参数组合。具体命令示例如下(Windows环境):

Pan.bat "/file:D:\KettleSample004.ktr"  "/param:#2=1""/param:#3=1" "/param:#4=1"

在测试机器上,不同参数组合的执行结果如下表2所示:

表2

从4次实验的结果看,选择2,3,8,10的参数值组合,执行时间为16秒,得到局部最优解。


3、使用插件

比手工解决更快捷的方法是利用插件,收集各步骤的运行结果,并自动分组、定义各分组相关步骤的命名参数。特别是在步骤相对较多的情况下,可以极大提高工作效率。下面简要介绍插件的用法。

第一步:打开已经定义好的转换,在左侧核心对象树中选择Kettle博士分类下的复制优化步骤,拖动放到转换中的任意位置。

第二步:双击复制优化步骤,在弹出的对话框中选择清空所有复制,并点击确定。这样,转换中所有已经设置复制数量的信息都将清空,以确保获取真实的性能监测数据。如下图8所示。

图8

第三步:双击复制优化步骤,在弹出的对话框中输入分组的数量(默认是7个分组),勾选自动创建参数,并点击确定。如下图9所示。

图9

第四步:运行转换。可以在Spoon中或者生产环境命令行中执行。执行后,可以在日志中看到分组的结果、每一个步骤的实际工作时间等信息,并自动创建相关命名参数,设置到转换每一个相应的步骤中(如果在Spoon中运行,需要重新打开转换文件)。下图10为在Spoon中执行的结果分析信息,图11为自动创建的命名参数以及步骤设置结果。

图10

图11

四、插件编程要点

下面来介绍插件编程的主要代码,插件中基于实际工作时间进行聚类的算法不在本文讨论之列。读者可以参照其他数据挖掘文章,找到关于聚类的算法。

首先需要了解的背景知识是,Java线程包含6种状态(NEW RUNNABLE BLOCKED WAITING TIMED_WAITING TERMINATED),对于优化需求而言,只需统计Kettle转换步骤线程的RUNNABLE状态时间,然后排序输出即可。主要代码解释如下:

1、查找转换的所有步骤

用到类org.pentaho.di.trans.Trans的主要方法:

  • publicList<StepMetaDataCombi> getSteps()

可以得到转换的所有步骤列表。列表中每个元素都是StepMetaDataCombi类的实例。而StepMetaDataCombi类有一个公有属性stepname代表步骤名称。

  • publicString getName()

可以得到转换名称。

如前所述,得到了转换名称和步骤名称,Kettle步骤执行线程的名称即可确定。

2、查找所有线程

查找所有线程的方法代码如下:

public Thread[] findAllThreads() {

  if(topGroup == null) {

    ThreadGroupgroup = Thread.currentThread().getThreadGroup();

    topGroup= group;

    while(group != null) {

      topGroup= group;

      group= group.getParent();

    }

  }

  intestimatedSize = topGroup.activeCount() * 2;

  Thread[]actualContainer = new Thread[estimatedSize];

  intactualSize = topGroup.enumerate(actualContainer);

  Thread[]threads = new Thread[actualSize];

  System.arraycopy(actualContainer,0, threads, 0, actualSize);

  returnthreads;

}

3、时间统计

启动一个后台线程,固定间隔时间查看各线程状态,如果为RUNNABLE状态,则累计时间。一直到转换执行完毕或者停止时,时间统计可以停止,排序输出统计结果后,线程运行结束。判断转换是否执行完毕,可以调用上述Trans类的isFinished方法;判断线程是否停止,可以调用其isStopped方法。

4、复制次数设置

添加命名参数,可以调用TransMeta类的addParameterDefinition方法。设置复制数量,可以调用StepMeta类的setCopiesString方法。


五、总结

本文介绍了通过对Kettle垂直扩展参数的优化,提高转换执行性能的方法。同时也把编程相关API要点逐一阐述。所有实际工作中的转换优化问题,均可参考。由于每一个转换步骤优化的方法不尽相同,所以未讨论单个步骤的优化问题。

如需原始转换及演示数据文件,请联系微信号carol_sxh获取,添加好友时请注明所需文件名(示例转换文件KettleSample004.7z(免费);插件文件kettle-doctor-plugin.7z(付费))。

注意:由于插件只在Kettle6~8版本上进行过验证,其他版本未经严格测试,使用前务必做好备份!



【注意】本文转自博主公众号“Kettle博士”,本公众号所发文章皆为原创,如转载请注明出处及作者。


微信扫一扫,关注该公众号