【备忘】使用Kettle(PDI)进行ETL

Kettle是一款开源的ETL工具,纯java编写,能够在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。java

现有一个需求。将数据库1中一个表(原表)中的数据抽取出来放到数据库2中另一个表(目标表)中。并实时更新。数据库

工具为kettle 5.0

工具为工具为kettle 5.0 (spoon 5.0)工具

Kettle中有两种脚本文件,transformation(转换)和job(做业),transformation完成针对数据的基础转换,job则完成整个工做流的控制。code

直接拖拽你想要的组件到面板 而后用鼠标中间滚轮或者按住shift鼠标左键拖拽就能够链接他们了。orm

下面谈谈主要的思路:ip

1.先count目标表 获得目标表的记录条数。get

表输入2 3

2.查询原表中目标表没有的部分 (这里用了自增id 即 id比目标表条数大的部分就是目标表不存在的)工作流

表输入3 2

或者用偏移量以下it

表输入5

由于有要替换的变量(就是那个问号)因此记得勾选这三个:io

唔

  1. 而后写原表与目标表相对应的字段:

字段选择2

4.最后传给目标表: 表输出2

来张 转换1 的全家福 其中标输入3 2 是用的自增id判断是否有新加入的行 表输入5是用的偏移量 这两个选一个(我的倾向偏移量)。 转换1

===========================以上的为转换1==================================

如下为 做业:

做业

其中转换2为: count下原表获得原表行数,而后与目标表比较一下 讲结果传入一个变量 转换2

变量是这么设置的 设置变量

在job里这么接收比较的:

接收变量

最后 脚本能够用Java代码调用kettle 5.0的和3.0的不同 以下

<!-- lang: java -->
package test;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

public class Test {
	public void runJob(String jobname){
		  try {
		   KettleEnvironment.init();
		   //jobname 是Job脚本的路径及名称
		   JobMeta jobMeta = new JobMeta(jobname, null);
		   Job job = new Job(null, jobMeta);
		   //向Job 脚本传递参数,脚本中获取参数值:${参数名}
		   //job.setVariable(paraname, paravalue);
		   job.start();
		   job.waitUntilFinished();
		   if (job.getErrors() > 0) {
		 
		    System.out.println("decompress fail!");
		   }
		  } catch (KettleException e) {
		   System.out.println(e);
		  }
		 }
		 
		   //调用Transformation示例:
		 
		public void runTrans(String filename) throws Exception {
		 
		    KettleEnvironment.init();
		    TransMeta transMeta = new TransMeta(filename);
		    Trans trans = new Trans(transMeta);
		    trans.prepareExecution(null);
		    trans.startThreads();
		    trans.waitUntilFinished();
		    
		    if (trans.getErrors()!=0) {
		      System.out.println("Error");
		    }
		  }


	public static void main(String[] args) {
		try {
			//new Test().runJob("script/j1.kjb");
			new Test().runTrans("script/t1.ktr");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
相关文章
相关标签/搜索