极客时间已完结课程限时免费阅读

46丨数据集成:如何对各种数据库进行集成和转换?

46丨数据集成:如何对各种数据库进行集成和转换?-极客时间

46丨数据集成:如何对各种数据库进行集成和转换?

讲述:陈旸

时长12:18大小14.09M

我们的数据可能分散在不同的数据源中,如果想要对这些数据分析,就需要先对这些数据进行集成。同时因为不同的来源,这些数据可能会存在各种问题,比如这些数据源采用了不同的 DBMS,数据之间存在冗余的情况,比如某一条数据在不同的数据源中都有记录,那么在数据集成中我们只保留其中的一条就可以了。除此以外,这些不同的数据源还可能字段标识不统一,再或者我们需要将数据转换成我们想要的格式要求进行输出。
数据集成是数据分析之前非常重要的工作,它将不同来源、不同规范以及不同质量的数据进行统一收集和整理,为后续数据分析提供统一的数据源。
好了,关于这部分内容,今天我们一起来学习下:
我们将数据从 OLTP 系统中转换加载到 OLAP 数据仓库中,这中间重要的步骤就是 ETL。那什么是 ETL 呢?
认识 Kettle 工具。在 Kettle 中有两个重要的脚本,分别是 Transformation 转换和 Job 作业,它们分别代表什么?
完成两个实例项目。通过使用 Kettle 完成 MySQL 数据表的数据同步,以及根据我们的需求将银行客户转账的记录导出到目标文件中。

什么是 ETL

在使用数据的时候,根据需求,我们可以分成 OLTP 和 OLAP 两种场景。OLTP 更注重数据的实时性,而 OLAP 更注重数据的分析能力,对时效性要求不高。在这个过程中,我们的数据源来自于 OLTP 系统,而最终得到的数据仓库则应用在 OLAP 系统中,中间的转换过程就是 ETL,如下图所示:
ETL 是英文 Extract、Transform 和 Load 的缩写,也就是将数据从不同的数据源进行抽取,然后通过交互转换,最终加载到目的地的过程。
在 Extract 数据抽取这个过程中,需要做大量的工作,我们需要了解企业分散在不同地方的数据源都采用了哪种 DBMS,还需要了解这些数据源存放的数据结构等,是结构化数据,还是非结构化数据。在抽取中,我们也可以采用全量抽取和增量抽取两种方式。相比于全量抽取,增量抽取使用得更为广泛,它可以帮我们动态捕捉数据源的数据变化,并进行同步更新。
在 Transform 数据转换的过程中,我们可以使用一些数据转换的组件,比如说数据字段的映射、数据清洗、数据验证和数据过滤等,这些模块可以像是在流水线上进行作业一样,帮我们完成各种数据转换的需求,从而将不同质量,不同规范的数据进行统一。
在 Load 数据加载的过程中,我们可以将转换之后的数据加载到目的地,如果目标是 RDBMS,我们可以直接通过 SQL 进行加载,或者使用批量加载的方式进行加载。

认识 Kettle 工具

Kettle 可以帮助我们完成 ETL 工作,它的设计师希望它能像水壶一样,可以从将不同的数据通过 Kettle 水壶按照指定的格式流出来。
相比于其他商业软件来说,Kettle 是 Java 开发的免费开源工具,可以运行在多个操作系统中。因此在使用之前,你需要安装 Java 运行环境(JRE)。Kettle 的下载地址在这里。
在 Kettle 中有 3 个重要的组件:
Spoon(勺子),它是一个图形界面,帮我们启动作业和转换设计。
Pan(锅),通过命令行方式完成转换执行(Transformation)。
Kitchen(厨房),通过命令行方式完成作业执行(Job)。
通过 Spoon,我们可以采用可视化的方式对 Kettle 中的两种脚本进行操作,这两种脚本分别是 Transformation(转换)和 Job(作业)。
Transformation(转换)对应的是.ktr 文件,它相当于一个容器,对数据操作进行了定义。数据操作就是数据从输入到输出的一个过程,Tranformation 可以帮我们完成数据的基础转换。
Job(作业)对应的是.kjb 文件,Job 帮我们完成整个工作流的控制。相比于 Transformation 来说,它是个更大的容器,负责将 Transformation 组织起来完成某项作业。
你可以把 Transformation 理解成比 Job 粒度更小的容器。在通常的工作中,我们会把任务分解成为不同的 Job,然后再把 Job 分解成多个 Transformation。

Kettle 使用实例

我们刚才对 Kettle 有了大致的了解,Kettle 工具包含的内容非常多,下面我们通过两个实例更深入地了解一下。

实例 1:将 test1 数据库中的 heros 数据表同步到 test2 数据库中

数据准备:
首先我们在 MySQL 中创建好 test1 和 test2 两个数据库,在 test1 中存储了我们之前已有的 heros 数据表(包括表结构和表数据),然后在 test2 数据库中,同样保存一个 heros 数据表,注意 test2 数据库中只需要有 heros 表结构即可。数据同步是需要我们使用 Kettle 工具来完成的。
你可以点击这里下载 heros 数据表结构及数据。
下面我们来使用 Kettle 来完成这个工作,只需要 3 步即可。
第一步,创建表输入组件,并对 test1 数据库中的 heros 数据表进行查询。
在 Kettle 左侧的面板中找到“核心对象”,输入“表输入”找到表输入组件,然后拖拽到中间的工作区。
双击表输入,选择 Wizard 来对数据库连接进行配置。这里我们可以创建一个数据库连接 test1,然后选择 MySQL。然后输入我们的服务器主机地址以及数据库名称,最后输入 MySQL 的用户名和密码完成数据库连接的创建工作。
创建好连接之后,我们来对 heros 数据表进行查询,输入 SQL 语句:SELECT * FROM heros,你也可以通过获取 SQL 查询语句的功能自动选择想要查询的字段。
然后点击确定完成表输入组件的创建。
第二步,创建插入 / 更新组件,配置 test2 数据库中的 heros 数据表的更新字段。
如果我们想要对目标表进行插入或者更新,这里需要使用“插入 / 更新”组件。我们在 Kettle 左侧的面板中找到这个组件,然后将它拖拽到中间的工作区。
在配置“插入 / 更新”组件之前,我们先创建一个从“表输入”到“插入 / 更新”的连接,这里可以将鼠标移动到“表输入”控件上,然后按住 Shift 键用鼠标从“表输入”拉一个箭头到“插入更新”。这样我们就可以在“插入 / 更新”组件中看到表输入的数据了。
然后我们对“插入 / 更新”组件进行配置,双击该组件,这里同样需要先创建数据库连接 test2,来完成对数据库 test2 的连接,原理与创建数据库连接 test1 一样。
接着,我们选择目标表,这里点击浏览按钮,在 test2 连接中找到我们的数据表 heros 选中并确定。
然后我们可以在下面指定查询关键字,这里指定表字段为 id,比较符为(=),数据流里的字段 1 为 id,这样我们就可以通过 id 来进行查询关联。
然后对于目的表中缺失的数据,我们需要对相应的字段进行更新(插入),这里可以直接通过“获取和更新字段”来完成全部更新字段的自动获取。
然后点击“确定”完成“插入 / 更新”组件的创建。
第三步,点击启动,开始执行转换。
这时 Kettle 就会自动根据数据流中的组件顺序来完成相应的转换,我们可以在 MySQL 中的 test2 数据库中看到更新的 heros 数据表。
我将转换保存为 test1.ktr 上传到了GitHub上,你可以下载一下。

实例 2:导入用户交易流水

刚才我们完成了一个简单的 Kettle 使用实例,现在我们来做一个稍微复杂一些的数据转换。
首先准备数据库。在数据库创建 account 和 trade 两张表。其中 account 表为客户表,字段含义如下:
trade 表为客户交易表,字段含义如下:
你可以点击这里下载 account 和 trade 数据表结构及数据下载。
现在我们希望将客户的交易流水导入到 txt 文件中,输出内容包括 4 个字段 account_id1、account_id2、amount 和 value。其中 value 为新增的字段,表示转账的类型,当转账对象 account_id2 为个人账户,则输出“对私客户发生的交易”,为公司账户时则输出“对公客户发生的交易”。
实际上我们在模拟从多个数据源中导出我们想要的数据,针对这个例子,我们想要输出的数据内容为打款账户,收款账户,转账金额,以及交易类型。
下面我们来看下如何使用 Kettle 来完成这个工作。
第一步,创建表输入组件,并对数据库中的 trade 数据表进行查询。
这里我们创建数据库连接,然后在 SQL 查询中输入:SELECT * FROM trade,相当于对 trade 进行全量获取。
第二步,创建数据库查询,对 account 表进行查询。
在 Kettle 左侧面板找到“数据库查询”控件,拖拽到中间的工作区,命名为“account 表查询”,然后将“表输入”控件和“account 表查询”之间进行连接。这样我们就可以得到“表输入”控件中的数据流。
然后我们对“account 表查询”控件进行配置,这里我们需要连接上 account 数据表,然后对 account 数据表中的 account_id 与 trade 数据表中的 account_id2 进行关联,查询返回值设置为 customer_type。
这样我们可以通过 account 数据表得到 trade.account_id2 所对应的 customer_type。也就是收款账户的账户类型(个人 / 公司)。
第三步,创建过滤记录,对不同的 customer_type 进行类型修改。
我们需要根据收款账户的类型,来进行交易类型的判断,因此这里我们可以根据前面得到的 customer_type 来进行记录的过滤。
这里在 Kettle 左侧的面板选择“过滤记录”,拖拽到中间的工作区,然后创建从“account 表查询”到“过滤记录”的连接。
在“过滤记录”中,我们设置判断条件为 customer_type = 1。
然后在从 Kettle 左侧面板中拖拽两个“JavaScript 代码”控件到工作区,分别将步骤名称设置为“对公类型修改”和“对私类型修改”
然后在对应的 Script 脚本处,设置变量 customer_type_cn。
然后我们在 Kettle 左侧面板拖拽两个“增加常量”控件,到中间的工作区,分别命名为“增加对公常量”和“增加对私常量”。然后将刚才的设置的两个“Javascript 代码”控件与“增加常量”控件进行连接。
在增加常量控件中,我们可以设置输出的常量:
这样我们就可以把得到的常量在后面结果中进行输出。
第四步,创建文本文件输出,将数据表导入到文件中。
刚才我们已经设置了从 trade 数据表中进行查询,然后通过 account 表关联查询得到 customer_type,然后再根据 customer_type 来判断输出的常量,是对公客户发生的交易,还是对私客户发生的交易。这里如果我们想要导入到一个文本文件中,可以从 Kettle 左侧面板中拖拽相应的控件到中间的工作区。然后将刚才设置的两个控件“增加对公常量”和“增加对私常量”设置连接到“文本文件输出”控件中。
然后在“文本文件输出”控件中,找到字段选项,设置我们想要导入的字段,你可以通过“获取字段”来帮你辅助完成这个操作。这里我们选择了 account_id1、account_id2、amount 以及刚才配置的常量 value。
然后我们点击确定就完成了所有配置工作,如下图所示。
当我们开启转换后,整个数据流就会从表输入开始自动完成转换和判断操作。将其导入到我们的文本文件中,导出的结果如下:
account_id1;account_id2;amout;value
322202020312335;622202020312337;200.0;对私客户发生的交易
322202020312335;322202020312336;100.0;对公客户发生的交易
622202020312336;322202020312337;300.0;对公客户发生的交易
622202020312337;322202020312335;400.0;对公客户发生的交易
我将上述过程的转换保存为 test2.ktr 上传到了GitHub上,你可以下载看一下。

总结

今天我们讲解了数据集成的作用,以及 ETL 的原理。在实际工作中,因为数据源可能是不同的 DBMS,因此我们往往会使用第三方工具来帮我们完成数据集成的工作,Kettle 作为免费开源的工作,在 ETL 工作中被经常使用到。它支持多种 RDBMS 和非关系型数据库,比如 MySQL、Oracle、SQLServer、DB2、PostgreSQL、MongoDB 等。不仅如此,Kettle 易于配置和使用,通过可视化界面我们可以设置好想要进行转换的数据源,并且还可以通过 JOB 作业进行定时,这样就可以按照每周每日等频率进行数据集成。
通过今天的两个 Kettle 实例,相信你对 Kettle 使用有一定的了解,你之前都用过哪些 ETL 工具,不妨说说你的经历?
第二个实例中,我们将交易类型分成了“对公客户发生的交易”以及“对私客户发生的交易”。如果我们的需求是分成 4 种交易类型,包括“公对公交易”、“公对私交易”、“私对公交易”以及“私对私交易”,那么该如何使用 Kettle 完成这个转换呢?
欢迎你在评论区写下你的思考,也欢迎把这篇文章分享给你的朋友或者同事,一起交流一下。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 8

提建议

上一篇
45丨数据清洗:如何使用SQL对数据进行清洗?
下一篇
47丨如何利用SQL对零售数据进行分析?
unpreview
 写留言

精选留言(10)

  • Coool
    2019-10-30
    用Java查询sql返回结果: package bw.jg.HelloMySql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class SelectInfo { public static void main(String[] args){ try { /* * 数据库连接 */ String url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf-8"; final String user = "root"; final String password = ""; Class.forName("com.mysql.jdbc.Driver"); Connection conn = DriverManager.getConnection(url, user, password); System.out.println("连接成功"); Statement stmt=conn.createStatement();//创建一个Statement对象 String sql="SELECT t.*,CASE CONCAT( a1.customer_type, a2.customer_type ) WHEN 10 THEN \'公对私交易\' WHEN 01 THEN \'私对公交易\' WHEN 11 THEN \'公对公交易\' WHEN 00 THEN \'私对私交易\' END AS trade_type FROM trade t LEFT JOIN account a1 ON t.account_id1 = a1.account_id LEFT JOIN account a2 ON t.account_id2 = a2.account_id";//生成一条sql语句 ResultSet rs=stmt.executeQuery(sql);//执行查询,把查询结果赋值给结果集对象 int id = 0; String account_id1 = null,account_id2 = null,trade_type=null; float amount = 0; System.out.println("id\t 转出账户\t\t\t 转入账户\t\t\t 金额 \t 交易类型"); while(rs.next()) { id=rs.getInt(1); account_id1=rs.getString(2); account_id2=rs.getString(3); amount=rs.getFloat(4); trade_type=rs.getString(5); System.out.println(id+"\t"+account_id1+"\t"+account_id2+"\t"+amount+"\t"+trade_type+"\t"); } System.out.println("获得查询结果集"); conn.close(); System.out.println("关闭数据库连接对象"); } catch (Exception e) { // TODO Auto-generated catch block System.out.println("引擎失败"); e.printStackTrace(); } } }
    展开

    作者回复: 多谢分享

    8
  • mickey
    2019-09-25
    很早以前我们做ETL用的是informatica的powercenter产品,每天从铁通省库拖地市库到本地服务器。
    3
  • 风轻扬
    2020-01-13
    老师,有没有专门讲kettle使用的书籍或者网站之类的。想系统学习一下kettle
    2
  • FengX
    2019-09-25
    老师,请问“JavaScript 代码”控件里的代码有什么作用?似乎只要“增加常量”控件就可以了。

    作者回复: 首先Kettle中可以支持Java或者JavaScript代码。你可以使用JavaScript 来做一些复杂的运算。在使用时,经常会用来创建JavaScript变量,对前面数据流中的数据进行计算,它会自动在上游数据流中添加新的列,从而为下面的数据流提供使用

    2
  • 小虾米
    2020-11-12
    我们主要是用informatica搭建pipeline,从各种source比如SAP,Salesforce 提取数据到Oracle DB。接着用SSIS 做ETL,负责整合到stage DB;然后就是modeling的过程,最后输出的fact table 和 dimension table做成cube。要分析的话可以用SSAS做MDX query或者excel 的power query,当然也可以搭入PowerBI或者Tableau或者SSRS做可视化,甚至可以用Einstein做可控分析。很多变可控。
    1
  • 湮汐
    2020-05-18
    我们使用的是 Datax 去进行抽数,但是它有一个问题:单点问题,要是抽到一般发生崩溃,那就没办法了
    1
  • Coool
    2019-10-28
    老师,能否讲一讲怎么用eclipse怎么连接MYSQL以及如何对表进行操作?
    共 2 条评论
    1
  • mickey
    2019-09-25
    流程如下: 表输入脚本:select t.*,a1.customer_type c1,a2.customer_type as c2 from trade t left join account a1 on t.account_id1=a1.account_id left join account a2 on t.account_id2=a2.account_id 表输入 --> 对公? --> 企业JS代码 ---> 公对公? ---> 公对公JS代码 ---> 公对公常量--->文本输出 | |--------->公对私JS代码 ----> 公对私常量---| | N | | ------> 个人JS代码 ---> 公对公? ---> 私对公JS代码 ---> 私对公常量 ---| |---------> 私对私JS代码 ---> 私对私常量 ---| 输出结果: account_id1;account_id2;amout;value 322202020312335;622202020312337;200.0;【公对私】客户发送的交易 622202020312337;322202020312335;400.0;【私对公】客户发送的交易 622202020312336;322202020312337;300.0;【私对公】客户发送的交易 322202020312335;322202020312336;100.0;【公对公】客户发送的交易
    展开
    1
  • mickey
    2019-09-25
    SQL脚本里,刘备的账号有点问题:“622202020311237”改为“622202020312337”
    1
  • piboye
    2022-10-11 来自广东
    Flink Sql 是不是更适合了?