服务热线:13616026886

技术文档 欢迎使用技术文档,我们为你提供从新手到专业开发者的所有资源,你也可以通过它日益精进

位置:首页 > 技术文档 > JAVA > 新手入门 > 开发工具 > 查看文档

开发技术:kettle java api 开发实战记录

  前言:

  为什么要用kettle和kettle java api?

  kettle是什么?kettle:是一个开源etl工具。kettle提供了基于java的图形化界面,使用很方便,kettle的etl工具集合也比较多,常用的etl工具都包含了。

  为什么使用kettle java api:就像kettle文档所说:kettle java api : program your own kettle transformation,kettle提供了基于java的脚步编写功能,可以灵活地自定义etl过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作kettle用户界面。

  kettle java api 实战操作记录:

  一、         搭建环境 :到http://www.kettle.be网站下载kettle的源码包,加压缩,例如解压缩到d:\kettle目录

  二、         打开eclipse,新建一个项目,要使用jdk1.5.0,因为kettle的要使用system.getenv(),只有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,现在又被jdk1.5支持了。

  三、         建一个class : transbuilder.java,可以把d:\kettle\ extra\transbuilder.java的内容原样拷贝到你的transbuilder.java里。

  四、         根据需要编辑源码。并需要对原程序进行如下修改,在头部增加:

  import org.eclipse.swt.dnd.transfer;

  //这个包被遗漏了,原始位置kettle根目录  \libswt\win32\swt.jar

  //add by chq(www.chq.name) on  2006.07.20

  (后来发现,不必加这个引用,因为编译时不需要)

  五、         编译准备,在eclipse中增加jar包,主要包括(主要依据extra\transbuilder.bat):

  \lib\kettle.jar
  \libext\cachedb.jar
  \libext\sqlbasejdbc.jar
  \libext\activation.jar
  \libext\db2jcc.jar
  \libext\db2jcc_license_c.jar
  \libext\edtftpj-1.4.5.jar
  \libext\firebirdsql-full.jar
  \libext\firebirdsql.jar
  \libext\gis-shape.jar
  \libext\hsqldb.jar
  \libext\ifxjdbc.jar
  \libext\javadbf.jar
  \libext\jconn2.jar
  \libext\js.jar
  \libext\jt400.jar
  \libext\jtds-1.1.jar
  \libext\jxl.jar
  \libext\ktable.jar
  \libext\log4j-1.2.8.jar
  \libext\mail.jar
  \libext\mysql-connector-java-3.1.7-bin.jar
  \libext\ojdbc14.jar
  \libext\orai18n.jar
  \libext\pg74.215.jdbc3.jar
  \libext\edbc.jar

  (注意 :下面这个包被遗漏了,要加上。原始位置kettle根目录\libswt\win32\swt.jar)

  \libswt\win32\swt.jar

  六、         编译成功后,准备运行

  为使程序不必登陆就可以运行,需要设置环境署文件:kettle.properties,位置在用户目录里,一般在 \documents and settings\用户\.kettle\,主要内容如下:

  kettle_repository=kettle@m80

  kettle_user=admin

  kettle_password=passwd

 

  七、         好了,现在可以运行一下了,看看数据是不是已经拷贝到目标表了。

 

  以下为修改后的程序源码:

++++++++++++++++++++++++++++++++


package name.chq.test;

 

import java.io.dataoutputstream;

import java.io.file;

import java.io.fileoutputstream;

 

import be.ibridge.kettle.core.const;

import be.ibridge.kettle.core.logwriter;

import be.ibridge.kettle.core.notepadmeta;

import be.ibridge.kettle.core.database.database;

import be.ibridge.kettle.core.database.databasemeta;

import be.ibridge.kettle.core.exception.kettleexception;

import be.ibridge.kettle.core.util.envutil;

import be.ibridge.kettle.trans.steploader;

import be.ibridge.kettle.trans.trans;

import be.ibridge.kettle.trans.transhopmeta;

import be.ibridge.kettle.trans.transmeta;

import be.ibridge.kettle.trans.step.stepmeta;

import be.ibridge.kettle.trans.step.stepmetainterface;

import be.ibridge.kettle.trans.step.selectvalues.selectvaluesmeta;

import be.ibridge.kettle.trans.step.tableinput.tableinputmeta;

import be.ibridge.kettle.trans.step.tableoutput.tableoutputmeta;

 

 

//这个包被遗漏了,原始位置kettle根目录\libswt\win32\swt.jar

//add by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20

//import org.eclipse.swt.dnd.transfer;

 

/**

 * class created to demonstrate the creation of transformations on-the-fly.

 *

 * @author matt

 *

 */

 

public class transbuilder

{

    public static final string[] databasesxml = {

        "<?xml version=\"1.0\" encoding=\"utf-8\"?>" +

        "<connection>" +

          "<name>target</name>" +

          "<server>192.168.17.35</server>" +

          "<type>oracle</type>" +

                     "<access>native</access>" +

                     "<database>test1</database>" +

                     "<port>1521</port>" +

                     "<username>testuser</username>" +

                     "<password>pwd</password>" +

                     "<servername/>" +

                     "<data_tablespace/>" +

                     "<index_tablespace/>" +

                     "<attributes>" +

                       "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +

                       "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +

                          "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +

                            "</attributes>" +

                       "</connection>" ,

 

        "<?xml version=\"1.0\" encoding=\"utf-8\"?>" +

                         "<connection>" +

                                "<name>source</name>" +

                                "<server>192.168.16.12</server>" +

                                "<type>oracle</type>" +

                                "<access>native</access>" +

                                "<database>test2</database>" +

                                "<port>1521</port>" +

                                "<username>testuser</username>" +

                                "<password>pwd2</password>" +

                                "<servername/>" +

                                "<data_tablespace/>" +

                                "<index_tablespace/>" +

                                "<attributes>" +

                                    "<attribute><code>extra_option_mysql.defaultfetchsize</code><attribute>500</attribute></attribute>" +

                                    "<attribute><code>extra_option_mysql.usecursorfetch</code><attribute>true</attribute></attribute>" +

                                       "<attribute><code>port_number</code><attribute>1521</attribute></attribute>" +

                                "</attributes>" +

                         "</connection>"

    };

 

 

    /**

     * creates a new transformation using input parameters such as the tablename to read from.

     * @param transformationname the name of the transformation

     * @param sourcedatabasename the name of the database to read from

     * @param sourcetablename the name of the table to read from

     * @param sourcefields the field names we want to read from the source table

     * @param targetdatabasename the name of the target database

     * @param targettablename the name of the target table we want to write to

     * @param targetfields the names of the fields in the target table (same number of fields as sourcefields)

     * @return a new transformation

     * @throws kettleexception in the rare case something goes wrong

     */

    public static final transmeta buildcopytable(

       string transformationname,string sourcedatabasename, string sourcetablename,

       string[] sourcefields, string targetdatabasename, string targettablename,

       string[] targetfields)

      throws kettleexception

    {

        logwriter log = logwriter.getinstance();

        envutil.environmentinit();

        try

        {

            //

            // create a new transformation...

            //

            transmeta transmeta = new transmeta();

            transmeta.setname(transformationname);

 

            // add the database connections

 

            for (int i=0;i<databasesxml.length;i++)

            {

                databasemeta databasemeta = new databasemeta(databasesxml[i]);

                transmeta.adddatabase(databasemeta);

            }

 

            databasemeta sourcedbinfo = transmeta.finddatabase(sourcedatabasename);

            databasemeta targetdbinfo = transmeta.finddatabase(targetdatabasename);

 

 

            //

            // add a note

            //

            string note = "reads information from table [" + sourcetablename+ "] on database ["

                            + sourcedbinfo + "]" + const.cr;

            note += "after that, it writes the information to table [" + targettablename + "] on database ["

                            + targetdbinfo + "]";

            notepadmeta ni = new notepadmeta(note, 150, 10, -1, -1);

            transmeta.addnote(ni);

 

 

            //

            // create the source step...

            //

            string fromstepname = "read from [" + sourcetablename + "]";

            tableinputmeta tii = new tableinputmeta();

            tii.setdatabasemeta(sourcedbinfo);

            string selectsql = "select "+const.cr;

            for (int i=0;i<sourcefields.length;i++)

            {

            /* modi by chq(www.chq.name): use * to replace the fields,经分析,以下语句可以处理‘*‘ */

                if (i>0)

                     selectsql+=", ";

                else selectsql+="  ";

 

                selectsql+=sourcefields[i]+const.cr;

            }

            selectsql+="from "+sourcetablename;

            tii.setsql(selectsql);

 

            steploader steploader = steploader.getinstance();

 

            string fromstepid = steploader.getsteppluginid(tii);

            stepmeta fromstep = new stepmeta(log, fromstepid, fromstepname, (stepmetainterface) tii);

            fromstep.setlocation(150, 100);

            fromstep.setdraw(true);

            fromstep.setdescription("reads information from table [" + sourcetablename

                                     + "] on database [" + sourcedbinfo + "]");

            transmeta.addstep(fromstep);

 

            //

            // add logic to rename fields

            // use metadata logic in selectvalues, use selectvalueinfo...

            //

            /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20

            selectvaluesmeta svi = new selectvaluesmeta();

            svi.allocate(0, 0, sourcefields.length);

            for (int i = 0; i < sourcefields.length; i++)

            {

                svi.getmetaname()[i] = sourcefields[i];

                svi.getmetarename()[i] = targetfields[i];

            }

 

            string selstepname = "rename field names";

            string selstepid = steploader.getsteppluginid(svi);

            stepmeta selstep = new stepmeta(log, selstepid, selstepname, (stepmetainterface) svi);

            selstep.setlocation(350, 100);

            selstep.setdraw(true);

            selstep.setdescription("rename field names");

            transmeta.addstep(selstep);

 

            transhopmeta shi = new transhopmeta(fromstep, selstep);

            transmeta.addtranshop(shi);

            fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20

            */

            //

            // create the target step...

            //

            //

            // add the tableoutputmeta step...

            //

            string tostepname = "write to [" + targettablename + "]";

            tableoutputmeta toi = new tableoutputmeta();

            toi.setdatabase(targetdbinfo);

            toi.settablename(targettablename);

            toi.setcommitsize(200);

            toi.settruncatetable(true);

 

            string tostepid = steploader.getsteppluginid(toi);

            stepmeta tostep = new stepmeta(log, tostepid, tostepname, (stepmetainterface) toi);

            tostep.setlocation(550, 100);

            tostep.setdraw(true);

            tostep.setdescription("write information to table [" + targettablename + "] on database [" + targetdbinfo + "]");

            transmeta.addstep(tostep);

 

            //

            // add a hop between the two steps...

            //

            transhopmeta hi = new transhopmeta(fromstep, tostep);

            transmeta.addtranshop(hi);

 

 

            // ok, if we're still here: overwrite the current transformation...

            return transmeta;

        }

        catch (exception e)

        {

            throw new kettleexception("an unexpected error occurred creating the new transformation", e);

        }

    }

 

    /**

     * 1) create a new transformation

     * 2) save the transformation as xml file

     * 3) generate the sql for the target table

     * 4) execute the transformation

     * 5) drop the target table to make this program repeatable

     *

     * @param args

     */

    public static void main(string[] args) throws exception

    {

       envutil.environmentinit();

        // init the logging...

        logwriter log = logwriter.getinstance("transbuilder.log", true, logwriter.log_level_detailed);

 

        // load the kettle steps & plugins

        steploader stloader = steploader.getinstance();

        if (!stloader.read())

        {

            log.logerror("transbuilder",  "error loading kettle steps & plugins... stopping now!");

            return;

        }

 

        // the parameters we want, optionally this can be

        string filename = "newtrans.xml";

        string transformationname = "test transformation";

        string sourcedatabasename = "source";

        string sourcetablename = "testuser.source_table";

        string sourcefields[] = {

               "*"

               };

 

        string targetdatabasename = "target";

        string targettablename = "testuser.target_table";

        string targetfields[] = {

               "*"

               };

 

 

 

        // generate the transformation.

        transmeta transmeta = transbuilder.buildcopytable(

                transformationname,

                sourcedatabasename,

                sourcetablename,

                sourcefields,

                targetdatabasename,

                targettablename,

                targetfields

                );

 

        // save it as a file:

        string xml = transmeta.getxml();

        dataoutputstream dos = new dataoutputstream(new fileoutputstream(new file(filename)));

        dos.write(xml.getbytes("utf-8"));

        dos.close();

        system.out.println("saved transformation to file: "+filename);

 

        // ok, what's the sql we need to execute to generate the target table?

        string sql = transmeta.getsqlstatementsstring();

 

        // execute the sql on the target table:

        database targetdatabase = new database(transmeta.finddatabase(targetdatabasename));

        targetdatabase.connect();

        targetdatabase.execstatements(sql);

 

        // now execute the transformation...

        trans trans = new trans(log, transmeta);

        trans.execute(null);

        trans.waituntilfinished();

 

        // for testing/repeatability, we drop the target table again

        /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20 不必删表

        //targetdatabase.execstatement("drop table "+targettablename);

        targetdatabase.disconnect();

    }

 

 

}



 

扫描关注微信公众号