前言:
为什么要用kettle和kettle java api?
kettle是什么?kettle:是一个开源etl工具。kettle提供了基于java的图形化界面,使用很方便,kettle的etl工具集合也比较多,常用的etl工具都包含了。
为什么使用kettle java api:就像kettle文档所说:kettle java api : program your own kettle transformation,kettle提供了基于java的脚步编写功能,可以灵活地自定义etl过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。
kettle java api 实战操作记录:
一、 搭建环境 :到http://www.kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\kettle目录
二、 打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用system.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。
三、 建一个class : transbuilder.java,可以把d:\kettle\ extra\transbuilder.java的内容原样拷贝到你的transbuilder.java里。
四、 根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:
import org.eclipse.swt.dnd.transfer;
//这个包被遗漏了,原始位置kettle根目录 \libswt\win32\swt.jar
//add by chq(www.chq.name) on 2006.07.20
(后来发现,不必加这个引用,因为编译时不需要)
五、 编译准备,在eclipse中增加jar包,主要包括(主要依据extra\transbuilder.bat):
\lib\kettle.jar
\libext\cachedb.jar
\libext\sqlbasejdbc.jar
\libext\activation.jar
\libext\db2jcc.jar
\libext\db2jcc_license_c.jar
\libext\edtftpj-1.4.5.jar
\libext\firebirdsql-full.jar
\libext\firebirdsql.jar
\libext\gis-shape.jar
\libext\hsqldb.jar
\libext\ifxjdbc.jar
\libext\javadbf.jar
\libext\jconn2.jar
\libext\js.jar
\libext\jt400.jar
\libext\jtds-1.1.jar
\libext\jxl.jar
\libext\ktable.jar
\libext\log4j-1.2.8.jar
\libext\mail.jar
\libext\mysql-connector-java-3.1.7-bin.jar
\libext\ojdbc14.jar
\libext\orai18n.jar
\libext\pg74.215.jdbc3.jar
\libext\edbc.jar
(注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\libswt\win32\swt.jar)
\libswt\win32\swt.jar
六、 编译成功后,准备运行
为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \documents and settings\用户\.kettle\,主要内容如下:
kettle_user=admin
kettle_password=passwd
七、 好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。
以下为修改后的程序源码:
++++++++++++++++++++++++++++++++
package name.chq.test;
import java.io.dataoutputstream;
import java.io.file;
import java.io.fileoutputstream;
import be.ibridge.kettle.core.const;
import be.ibridge.kettle.core.logwriter;
import be.ibridge.kettle.core.notepadmeta;
import be.ibridge.kettle.core.database.database;
import be.ibridge.kettle.core.database.databasemeta;
import be.ibridge.kettle.core.exception.kettleexception;
import be.ibridge.kettle.core.util.envutil;
import be.ibridge.kettle.trans.steploader;
import be.ibridge.kettle.trans.trans;
import be.ibridge.kettle.trans.transhopmeta;
import be.ibridge.kettle.trans.transmeta;
import be.ibridge.kettle.trans.step.stepmeta;
import be.ibridge.kettle.trans.step.stepmetainterface;
import be.ibridge.kettle.trans.step.selectvalues.selectvaluesmeta;
import be.ibridge.kettle.trans.step.tableinput.tableinputmeta;
import be.ibridge.kettle.trans.step.tableoutput.tableoutputmeta;
//这个包被遗漏了,原始位置kettle根目录\libswt\win32\swt.jar
//add by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20
//import org.eclipse.swt.dnd.transfer;
/**
* class created to demonstrate the creation of transformations on-the-fly.
*
* @author matt
*
*/
public class transbuilder
{
public static final string[] databasesxml = {
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" +
"<connection>" +
"<name>target</name>" +
"<server>192.168.17.35</server>" +
"<type>oracle</type>" +
"<access>native</access>" +
"<database>test1</database>" +
"<port>1521</port>" +
"<username>testuser</username>" +
"<password>pwd</password>" +
"<servername/>" +
"<data_tablespace/>" +
"<index_tablespace/>" +
"<attributes>" +
"<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +
"<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +
"<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +
"</attributes>" +
"</connection>" ,
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" +
"<connection>" +
"<name>source</name>" +
"<server>192.168.16.12</server>" +
"<type>oracle</type>" +
"<access>native</access>" +
"<database>test2</database>" +
"<port>1521</port>" +
"<username>testuser</username>" +
"<password>pwd2</password>" +
"<servername/>" +
"<data_tablespace/>" +
"<index_tablespace/>" +
"<attributes>" +
"<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +
"<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +
"<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +
"</attributes>" +
"</connection>"
};
/**
* creates a new transformation using input parameters such as the tablename to read from.
* @param transformationname the name of the transformation
* @param sourcedatabasename the name of the database to read from
* @param sourcetablename the name of the table to read from
* @param sourcefields the field names we want to read from the source table
* @param targetdatabasename the name of the target database
* @param targettablename the name of the target table we want to write to
* @param targetfields the names of the fields in the target table (same number of fields as sourcefields)
* @return a new transformation
* @throws kettleexception in the rare case something goes wrong
*/
public static final transmeta buildcopytable(
string transformationname,string sourcedatabasename, string sourcetablename,
string[] sourcefields, string targetdatabasename, string targettablename,
string[] targetfields)
throws kettleexception
{
logwriter log = logwriter.getinstance();
envutil.environmentinit();
try
{
//
// create a new transformation...
//
transmeta transmeta = new transmeta();
transmeta.setname(transformationname);
// add the database connections
for (int i=0;i<databasesxml.length;i++)
{
databasemeta databasemeta = new databasemeta(databasesxml[i]);
transmeta.adddatabase(databasemeta);
}
databasemeta sourcedbinfo = transmeta.finddatabase(sourcedatabasename);
databasemeta targetdbinfo = transmeta.finddatabase(targetdatabasename);
//
// add a note
//
string note = "reads information from table [" + sourcetablename+ "] on database ["
+ sourcedbinfo + "]" + const.cr;
note += "after that, it writes the information to table [" + targettablename + "] on database ["
+ targetdbinfo + "]";
notepadmeta ni = new notepadmeta(note, 150, 10, -1, -1);
transmeta.addnote(ni);
//
// create the source step...
//
string fromstepname = "read from [" + sourcetablename + "]";
tableinputmeta tii = new tableinputmeta();
tii.setdatabasemeta(sourcedbinfo);
string selectsql = "select "+const.cr;
for (int i=0;i<sourcefields.length;i++)
{
/* modi by chq(www.chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */
if (i>0)
selectsql+=", ";
else selectsql+=" ";
selectsql+=sourcefields[i]+const.cr;
}
selectsql+="from "+sourcetablename;
tii.setsql(selectsql);
steploader steploader = steploader.getinstance();
string fromstepid = steploader.getsteppluginid(tii);
stepmeta fromstep = new stepmeta(log, fromstepid, fromstepname, (stepmetainterface) tii);
fromstep.setlocation(150, 100);
fromstep.setdraw(true);
fromstep.setdescription("reads information from table [" + sourcetablename
+ "] on database [" + sourcedbinfo + "]");
transmeta.addstep(fromstep);
//
// add logic to rename fields
// use metadata logic in selectvalues, use selectvalueinfo...
//
/* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20
selectvaluesmeta svi = new selectvaluesmeta();
svi.allocate(0, 0, sourcefields.length);
for (int i = 0; i < sourcefields.length; i++)
{
svi.getmetaname()[i] = sourcefields[i];
svi.getmetarename()[i] = targetfields[i];
}
string selstepname = "rename field names";
string selstepid = steploader.getsteppluginid(svi);
stepmeta selstep = new stepmeta(log, selstepid, selstepname, (stepmetainterface) svi);
selstep.setlocation(350, 100);
selstep.setdraw(true);
selstep.setdescription("rename field names");
transmeta.addstep(selstep);
transhopmeta shi = new transhopmeta(fromstep, selstep);
transmeta.addtranshop(shi);
fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20
*/
//
// create the target step...
//
//
// add the tableoutputmeta step...
//
string tostepname = "write to [" + targettablename + "]";
tableoutputmeta toi = new tableoutputmeta();
toi.setdatabase(targetdbinfo);
toi.settablename(targettablename);
toi.setcommitsize(200);
toi.settruncatetable(true);
string tostepid = steploader.getsteppluginid(toi);
stepmeta tostep = new stepmeta(log, tostepid, tostepname, (stepmetainterface) toi);
tostep.setlocation(550, 100);
tostep.setdraw(true);
tostep.setdescription("write information to table [" + targettablename + "] on database [" + targetdbinfo + "]");
transmeta.addstep(tostep);
//
// add a hop between the two steps...
//
transhopmeta hi = new transhopmeta(fromstep, tostep);
transmeta.addtranshop(hi);
// ok, if we're still here: overwrite the current transformation...
return transmeta;
}
catch (exception e)
{
throw new kettleexception("an unexpected error occurred creating the new transformation", e);
}
}
/**
* 1) create a new transformation
* 2) save the transformation as xml file
* 3) generate the sql for the target table
* 4) execute the transformation
* 5) drop the target table to make this program repeatable
*
* @param args
*/
public static void main(string[] args) throws exception
{
envutil.environmentinit();
// init the logging...
logwriter log = logwriter.getinstance("transbuilder.log", true, logwriter.log_level_detailed);
// load the kettle steps & plugins
steploader stloader = steploader.getinstance();
if (!stloader.read())
{
log.logerror("transbuilder", "error loading kettle steps & plugins... stopping now!");
return;
}
// the parameters we want, optionally this can be
string filename = "newtrans.xml";
string transformationname = "test transformation";
string sourcedatabasename = "source";
string sourcetablename = "testuser.source_table";
string sourcefields[] = {
"*"
};
string targetdatabasename = "target";
string targettablename = "testuser.target_table";
string targetfields[] = {
"*"
};
// generate the transformation.
transmeta transmeta = transbuilder.buildcopytable(
transformationname,
sourcedatabasename,
sourcetablename,
sourcefields,
targetdatabasename,
targettablename,
targetfields
);
// save it as a file:
string xml = transmeta.getxml();
dataoutputstream dos = new dataoutputstream(new fileoutputstream(new file(filename)));
dos.write(xml.getbytes("utf-8"));
dos.close();
system.out.println("saved transformation to file: "+filename);
// ok, what's the sql we need to execute to generate the target table?
string sql = transmeta.getsqlstatementsstring();
// execute the sql on the target table:
database targetdatabase = new database(transmeta.finddatabase(targetdatabasename));
targetdatabase.connect();
targetdatabase.execstatements(sql);
// now execute the transformation...
trans trans = new trans(log, transmeta);
trans.execute(null);
trans.waituntilfinished();
// for testing/repeatability, we drop the target table again
/* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20 不必删表
//targetdatabase.execstatement("drop table "+targettablename);
targetdatabase.disconnect();
}
}