Kettle 4.2源码分析第三讲--Kettle 转换机制transformation介绍

转换机制

  每一个转换步骤都是ETL数据流里面的一个任务。转换步骤包括输入、处理和输出。输入步骤从外部数据源获取数据,例如文件或者数据库;处理步骤处理数据流,字段计算,流处理等,例如整合或者过滤。输出步骤将数据写会到存储系统里面,例如文件或者数据库。数据库

 

图 1 转换步骤示例安全

1. Step类图简介

  Kettle为扩展插件提供了4个扩展点,这4个扩展点也是每一个步骤的组成。每一个类都有其特定的目的及扮演的角色。以TableInput为例,下图说明了这4个类的继承体系。函数

 

图 2 StepInterface继承体系spa

  实现StepInterface接口的类,在转换运行时,将是数据实际处理的位置。每一个执行线程都表示一个实现StepInterface的实例。插件

  BaseStep实现了StepInterface是各step具体实现类的基类。完成了公用的处理函数,如putRow(),可是对于更具体的processRow()在StepBase的子类中。StepBase的主要成员有线程

  public ArrayList<RowSet>  inputRowSets,outputRowSets;3d

  StepBase的子类每次从inputRowSets中取出一行数据,向outputRowSets中写入一行数据。调试

 

图 3 StepDataInterface继承体系code

  实现StepDataInterface接口的类为数据类,当插件执行时,对于每一个执行执行的线程都是惟一的。保存于step相关的数据信息,好比行的元数据信息。xml

 

图 4 StepMetaInterface继承体系

  实现了StepMetaInterface接口的类为元数据类。它的职责是保存和序列化特定步骤的实例配置,例如保存步骤的名称、字段名称等,如何生成加载xml或者读写数据库。

图 5 StepDialogInterface继承体系

  实现了StepDialogInterface接口的类为对话框类,该类实现了该步骤与用户交互的界面,它显示一对话框,经过对话框用户能够根据本身的要求进行步骤的设定。该对话框类与元数据类关系很是密切,对话框里面的配置数据均会保存在元数据类里面。

2. 步骤间交互通讯类

2.1.    RowSet

图 6 步骤之间通讯机制

  RowSet的实现类,负责步骤之间的相互通讯,rowset对象便是前一个step的成员也是后一个step的成员,访问是线程安全的。

 

图 7 RowSet实现类内存快照

  RowSet类中包含源step,目标step和由源向目标发送的一个rowMeta和一组data。其中data数据是以行为单位的队列(queArray)。一个RowSet做为此源step的outputrowsets的一部分。同时做为目标step的inputRowsets一部分。源Step每次向队列中写一行数据,目标step每次从队列中读取一行数据。

图 8 RowSet实现类

2.2.    行元数据

  全部的data均擦除为object对象。步骤与步骤之间以行为单位进行处理,天然须要知道每行的结构,即行元数据。行元数据至少须要包括类型、名称,固然还可能包括字段长度、精度等常见内容。

  行元数据不只在执行的时候须要,并且在转换设置的时候一样须要。每一个步骤的行元数据都会保存在.ktr文件或者数据库里面,因此能够根据步骤名称从TransMeta对象中获取行元数据。

  行元数据的UML类图结构以下所示,主要有单元格元数据组成行元数据。在现有的版本中,支持的数据类型有String、Date、BigNumber、Boolean、SerializableType、Binary、Integer、Numberic。

 

图 9 行元数据UML类图

3. Trans配置及开启

 

图 10 Trans执行时序图

  在真正运行trans以前,还须要对运行模式进行一个设置。设置结果,会传给TransGraph.start(executionConfiguration)。配置界面以下所示:

 

图 11 执行转换模式设置

实例化Trans的基本流程以下,Trans类时最后真正执行转换的类。实例化以前须要配置启动项,保持.ktr文件同步,而后实例化Trans类。最后,开启后台程序,这样不会影响UI的操做,真正的转换在后台执行。

 

图 12 实例化Trans流程图

4. Trans执行

  trans类的执行有execute()负责,主要包含两个步骤:转换执行前的准备工做和全部线程的开启。Trans每个步骤都会对应一个独立的线程,线程之间公国RowSet进行通讯交互。

代码 Trans执行代码

1   public void execute(String[] arguments) throws KettleException {
2 
3        prepareExecution(arguments);
4 
5        startThreads();
6 
7 }

4.1.    执行准备(prepareExecution)

该步骤,主要完成对通讯类的初始化,对步骤的包装初始化。最后启动各个步骤初始化线程,即调用各个步骤的init()方法。准备结束以后,步骤之间的通讯机制完成了,各个步骤的初始化工做也完成了。具体的流程以下所示:

 

图 13 准备执行流程图

1.4.2.    转换处理执行

Trans转换执行引擎类,经过startThreads()启动步骤线程。为全部步骤添加监听器,在开启监听进程对全部线程进行监听。具体的步骤以下所示

 

图 14 启动全部步骤线程

1.4.3.    步骤执行过程

  实现StepInterface的不一样的step各个功能个不同,可是它们之间也有必定的规律性。下图只列举了两个step,(TextInput)文本输入和Uniquerow(去重)。BaseStep封装了getRow()和putRow()方法,从上一个步骤获取数据和将数据输入到下一个步骤。

  基类BaseStep采起了统一的处理方式,调用子类processRow以行为单位处理,核心代码以下。

  while (stepInterface.processRow(meta, data)&& !stepInterface.isStopped());

  processRow( )通用过程是:调用基类BaseStep 的getRow( )获得数据,对一行数据处理,处理以后调用基类putRow( )方法数据保存至outputRowSets(即next step的inputRowSets)

 

 

图 15 TextInput与Uniquerow

1.4.4.    元数据与数据关系。

  Trans中的ETL过程(每一个step)以行为单位处理,其中行的元数据信息RowMeta和数据信息统一保存在RowSet对象中。

  在RowSet中RowMeta的成员的调试结果以下。可见rowMeta储存了每列数据的名称和类型。第一列列名flag,数据是长度为1的String;第二列列名id…

 

RowSet的数据信息在queArray队列中,调试结果以下:能够看出第一个数据元素是一个Object包含了3列,数据内容为(N,1,a…)

 

相关文章
相关标签/搜索