使用Kettle进行数据库迁移操做

思路:先从源库读取须要迁移的表名,将代表做为变量,并在目标库建立这些表,最后表到表抽取数据java

整个迁移流程包括2个job,4个transform。sql

具体以下:数据库

1.总的Job:测试

2.获取表名流程的转换:code

3.子job表数据抽取做业orm

3.1转换 表名变量设置xml

 

3.2转换 建立表结构资源

表输入以下,注意:这里的查询必需要查询出数据才行,否则后面会没法获取表结构来建立表get

(有些人说这里加个条件where 1=2,不须要数据只须要表结构,我本身测试,发现这样没法获取到数据,后面的java脚本中能够打印这些信息,加了条件会报错,打印信息为null)input

java脚本:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    {
    	// First, get a row from the default input hop
    	//
    	Object[] r = getRow();
		//logBasic("r.size="+r.length);
   		org.pentaho.di.core.database.DatabaseMeta dbmeta = null;
    	java.util.List list = getTrans().getRepository().readDatabases();//3.x中获取资源库的全部数据库链接信息用getDatabases();
    	if(list != null && !list.isEmpty())
    	{
    		for(int i=0;i<list.size();i++)
    		{
    			dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i);
    			//下面是目标库的数据库链接,你们可根据须要修改
    			if("testb".equalsIgnoreCase(dbmeta.getName()))
    			{
					logBasic("数据库链接名为:"+dbmeta.getName());
    				break;
    			}
    		}
    	}
    	if(dbmeta!=null)
    	{
    		org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta);
    		try
    		{
    			db.connect();
    			String tablename = getVariable("TABLENAME");
    			logBasic("开始建立表:" + tablename);
    			if(tablename!=null && tablename.trim().length()>0)
    			{
					logBasic("data.inputRowMeta="+data.inputRowMeta);
    				String sql = db.getDDL(tablename, data.inputRowMeta);//${TABLENAME}
					logBasic("sql="+sql);
    				db.execStatement(sql.replace(";", ""));
    				logBasic(sql);
    			}
    		}catch(Exception e){
    			logError("建立表出现异常",e);
    		}finally{
    			db.disconnect();
    		}
    	}
    	return false;
	}

3.3数据抽取

运行结果:

2017/07/24 15:16:25 - Spoon - 正在开始任务...
2017/07/24 15:16:25 - 数据库迁移做业 - 开始执行任务
2017/07/24 15:16:25 - 数据库迁移做业 - 开始项[获取表名流程]
2017/07/24 15:16:25 - 源表名称获取 - 为了转换解除补丁开始  [源表名称获取]
2017/07/24 15:16:25 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:25 - 表输入.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2017/07/24 15:16:25 - 字段选择.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 复制记录到结果.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 数据库迁移做业 - 开始项[表数据抽取做业]
2017/07/24 15:16:25 - 表数据抽取子做业 - 开始项[代表成变量设置]
2017/07/24 15:16:25 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:25 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:25 - 设置变量.0 - Set variable TABLENAME to value [TESTM]
2017/07/24 15:16:25 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:25 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 表数据抽取子做业 - 开始项[建立表结构]
2017/07/24 15:16:25 - 建立表结构 - 为了转换解除补丁开始  [建立表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 建立入库表结构.0 - 数据库链接名为:testb
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 建立入库表结构.0 - 开始建立表:TESTM
2017/07/24 15:16:26 - 建立入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 建立入库表结构.0 - sql=CREATE TABLE TESTM
2017/07/24 15:16:26 - 建立入库表结构.0 - (
2017/07/24 15:16:26 - 建立入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 建立入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 建立入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - )
2017/07/24 15:16:26 - 建立入库表结构.0 - ;
2017/07/24 15:16:26 - 建立入库表结构.0 - CREATE TABLE TESTM
2017/07/24 15:16:26 - 建立入库表结构.0 - (
2017/07/24 15:16:26 - 建立入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 建立入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 建立入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - )
2017/07/24 15:16:26 - 建立入库表结构.0 - ;
2017/07/24 15:16:26 - 建立入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子做业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子做业 - 完成做业项[数据抽取] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子做业 - 完成做业项[建立表结构] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子做业 - 完成做业项[代表成变量设置] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子做业 - 开始项[代表成变量设置]
2017/07/24 15:16:26 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:26 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:26 - 设置变量.0 - Set variable TABLENAME to value [TESTN]
2017/07/24 15:16:26 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:26 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子做业 - 开始项[建立表结构]
2017/07/24 15:16:26 - 建立表结构 - 为了转换解除补丁开始  [建立表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 建立入库表结构.0 - 数据库链接名为:testb
2017/07/24 15:16:26 - 建立入库表结构.0 - 开始建立表:TESTN
2017/07/24 15:16:26 - 建立入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 建立入库表结构.0 - sql=CREATE TABLE TESTN
2017/07/24 15:16:26 - 建立入库表结构.0 - (
2017/07/24 15:16:26 - 建立入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 建立入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 建立入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - )
2017/07/24 15:16:26 - 建立入库表结构.0 - ;
2017/07/24 15:16:26 - 建立入库表结构.0 - CREATE TABLE TESTN
2017/07/24 15:16:26 - 建立入库表结构.0 - (
2017/07/24 15:16:26 - 建立入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 建立入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 建立入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 建立入库表结构.0 - )
2017/07/24 15:16:26 - 建立入库表结构.0 - ;
2017/07/24 15:16:26 - 建立入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子做业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表数据抽取子做业 - 完成做业项[数据抽取] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子做业 - 完成做业项[建立表结构] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子做业 - 完成做业项[代表成变量设置] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移做业 - 开始项[成功]
2017/07/24 15:16:27 - 数据库迁移做业 - 完成做业项[成功] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移做业 - 完成做业项[表数据抽取做业] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移做业 - 完成做业项[获取表名流程] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移做业 - 任务执行完毕
2017/07/24 15:16:27 - Spoon - 任务已经结束.