目录java
2、链接Hadoopmysql
(1)开始前准备正则表达式
(2)配置步骤sql
2. 链接Hive数据库
3. 链接Impalaexpress
4. 创建MySQL数据库链接apache
本篇演示使用Kettle操做Hadoop上的数据。首先概要介绍Kettle对大数据的支持,而后用示例说明Kettle如何链接Hadoop,如何导入导出Hadoop集群上的数据,如何用Kettle执行Hive的HiveQL语句,还会用一个典型的MapReduce转换,说明Kettle在实际应用中是怎样利用Hadoop分布式计算框架的。本篇最后介绍如何在Kettle中提交Spark做业。
1、Hadoop相关的步骤与做业项
在“ETL与Kettle”(https://wxy0327.blog.csdn.net/article/details/107985148)的小结中曾提到,Kettle具备完备的转换步骤与做业项,使它可以支持几乎全部常见数据源。一样Kettle对大数据也提供了强大的支持,这体如今转换步骤与做业项的“Big Data”分类中。本例使用的Kettle 8.3版本中所包含的大数据相关步骤有19个,做业项有10个。表3-1和表3-2分别对这些步骤和做业项进行了简单描述。
步骤名称 |
描述 |
Avro input |
读取Avro格式文件 |
Avro output |
写入Avro格式文件 |
Cassandra input |
从一个Cassandra column family中读取数据 |
Cassandra output |
向一个Cassandra column family中写入数据 |
CouchDB input |
获取CouchDB数据库一个设计文档中给定视图所包含的全部文档 |
HBase input |
从HBase column family中读取数据 |
HBase output |
向HBase column family中写入数据 |
HBase row decoder |
对HBase的键/值对进行编码 |
Hadoop file input |
读取存储在Hadoop集群中的文本型文件 |
Hadoop file output |
向存储在Hadoop集群中的文本型文件中写数据 |
MapReduce input |
向MapReduce输入键值对 |
MapReduce output |
从MapReduce输出键值对 |
MongoDB input |
读取MongoDB中一个指定数据库集合的全部记录 |
MongoDB output |
将数据写入MongoDB的集合中 |
ORC input |
读取ORC格式文件 |
ORC output |
写入ORC格式文件 |
Parquet input |
读取Parquet格式文件 |
Parquet output |
写入Parquet格式文件 |
SSTable output |
做为Cassandra SSTable写入一个文件系统目录 |
表3-1 Kettle转换中的大数据相关步骤
做业项名称 |
描述 |
Amazon EMR job executor |
在Amazon EMR中执行MapReduce做业 |
Amazon Hive job executor |
在Amazon EMR中执行Hive做业 |
Hadoop copy files |
将本地文件上传到HDFS,或者在HDFS上复制文件 |
Hadoop job executor |
在Hadoop节点上执行包含在JAR文件中的MapReduce做业 |
Oozie job executor |
执行Oozie工做流 |
Pentaho MapReduce |
在Hadoop中执行基于MapReduce的转换 |
Pig script executor |
在Hadoop集群上执行Pig脚本 |
Spark submit |
提交Spark做业 |
Sqoop export |
使用Sqoop将HDFS上的数据导出到一个关系数据库中 |
Sqoop import |
使用Sqoop将一个关系数据库中的数据导入到HDFS上 |
表3-2 Kettle做业中的大数据相关做业项
Kettle的设计很独特,它既能够在Hadoop集群外部执行,也能够在Hadoop集群内的节点上执行。在外部执行时,Kettle可以从HDFS、Hive和HBase抽取数据,或者向它们中装载数据。在Hadoop集群内部执行时,Kettle转换能够做为Mapper或Reducer任务执行,并容许将Pentaho MapReduce做业项做为MapReduce的可视化编程工具来使用。后面咱们会用示例演示这些功能。关于Hadoop及其组件的基本概念和功能特性不是本专题所讨论的范畴,可参考其它资源。
2、链接Hadoop
Kettle能够与Hadoop协同工做。经过提交适当的参数,Kettle能够链接Hadoop的HDFS、MapReduce、Zookeeper、Oozie、Sqoop和Spark服务。在数据库链接类型中支持Hive和Impala。在本示例中配置Kettle链接HDFS、Hive和Impala。为了给本专题后面实践中建立的转换或做业使用,咱们还将定义一个普通的mysql数据库链接对象。
1. 链接Hadoop集群
要使Kettle链接Hadoop集群,须要两个操做:设置一个Active Shim;创建并测试链接。Shim是Pentaho开发的插件,功能有点相似于一个适配器,帮助用户链接Hadoop。Pentaho按期发布Shim,能够从sourceforge网站下载与Kettle版本对应的Shim安装包。使用Shim可以链接不一样的Hadoop发行版本,如CDH、HDP、MapR、Amazon EMR等。当在Kettle中执行一个大数据的转换或做业时,缺省会使用设置的Active Shim。初始安装Kettle时,并无Active Shim,所以在尝试链接Hadoop集群前,首先要作的就是选择一个Active Shim,选择的同时也就激活了此Active Shim。设置好Active Shim后,再通过必定的配置,就能够测试链接了。Kettle内建的工具能够为完成这些工做提供帮助。
(1)开始前准备
在配置链接前,要确认Kettle具备访问HDFS相关目录的权限,访问的目录一般包括用户主目录以及工做须要的其它目录。Hadoop管理员应该已经配置了容许Kettle所在主机对Hadoop集群的访问。除权限外,还须要确认如下信息:
- Hadoop集群的发行版本。Kettle与Hadoop版本要匹配,本例使用的Kettle 8.3所对应的大数据支持矩阵详见“https://help.pentaho.com/Documentation/8.3/Setup/Components_Reference”。
- HDFS、MapReduce或Zookeeper服务的IP地址和端口号。
- 若是要使用Oozie,须要知道Oozie服务的URL。
本例中已经安装好4个节点的CDH 6.3.1集群,IP地址及主机名以下:
172.16.1.124 manager
172.16.1.125 node1
172.16.1.126 node2
172.16.1.127 node3
启动的Hadoop服务如图3-1所示,全部服务都使用缺省端口。关于CDH集群的安装与卸载,能够参见个人博客“基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)”和“一键式彻底删除CDH 6.3.1”。

为了用主机名访问Hadoop相关服务,在Kettle主机(172.16.1.101)的/etc/hosts文件中添加了Hadoop集群四个节点的IP与主机名。
(2)配置步骤
1. 在Kettle中配置Hadoop客户端文件
在浏览器中登陆Cloudera Manager,选择Hive服务,点击“操做”->“下载客户端配置”。在获得的hive-clientconfig.zip压缩包中包括了当前Hadoop客户端的12个配置文件。将其中的core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml 5个文件复制到Kettle根目录下的plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/目录下,覆盖原来Kettle自带的这些文件。
2. 选择Active Shim
在Spoon界面中,选择主菜单“工具” -> “Hadoop Distribution...”,在对话框中选择“Cloudera CDH 6.1.0”,如图3-2所示,点击OK按钮肯定后重启Spoon。

3. 在Spoon中建立Hadoop clusters对象
新建一个转换,在工做区左侧的树的“主对象树”标签中,选择 Hadoop clusters -> 右键New Cluster,对话框中输入如图3-3所示的属性值。

上图的Hadoop集群配置窗口中的选项及定义说明以下:
- Cluster Name:定义要链接的集群名称,这里为CDH631。
- Hostname(HDFS段):Hadoop集群中NameNode节点的主机名。因为本例中的CDH配置了HDFS HA,这里用HDFS NameNode服务名替代了主机名。
- Port(HDFS段):Hadoop集群中NameNode节点的端口号,HA不须要填写。
- Username(HDFS段):HDFS的用户名,经过宿主操做系统给出,不用填。
- Password(HDFS段):HDFS的密码,经过宿主操做系统给出,不用填。
- Hostname(JobTracker段):Hadoop集群中JobTracker节点的主机名。若是有独立的JobTracker节点,在此输入,不然使用HDFS的主机名。
- Port(JobTracker段):Hadoop集群中JobTracker节点的端口号,不能与HDFS的端口号相同。
- Hostname(ZooKeeper段):Hadoop集群中Zookeeper节点的主机名,只有在链接Zookeeper服务时才须要。
- Port(ZooKeeper段):Hadoop集群中Zookeeper节点的端口号,只有在链接Zookeeper服务时才须要。
- URL(Oozie段):Oozie WebUI的地址,只有在链接Oozie服务时才须要。
这是本例CDH的配置,你应该按本身的状况进行相应修改。而后点击“Test”按钮,测试结果如图3-4所示。正常状况下此时除了一个Kafka链接失败的警告外,其它都应该经过测试。Kafka链接失败,缘由是没有配置Kafka的Bootstrap servers。咱们在CDH中并无启动Kafka服务,所以忽略此警告。

关闭“Hadoop Cluster Test”窗口后,点击“Hadoop cluster”窗口的“肯定”按钮,至此就创建了一个Kettle能够链接的Hadoop集群。
若是是首次配置Kettle链接Hadoop,不免会出现这样那样的问题,Pentaho文档中列出了配置过程当中的常见问题及其通用解决方法,如表3-3所示。但愿这能对Kettle或Hadoop新手有所帮助。
症状 |
一般缘由 |
通用解决方法 |
Shim和配置问题 |
||
No shim |
|
|
Shim doesn't load |
|
|
The file system's URL does not match the URL in the configuration file |
*-site.xml文件配置错误 |
参考Pentaho “Set Up Pentaho to Connect to an Apache Hadoop Cluster”文档,检查配置文件,主要是core-site.xml文件是否配置正确。 |
Sqoop Unsupported major.minor version Error |
在pentaho6.0中,Hadoop集群上的Java版本比Pentaho使用的Java版本旧。 |
|
链接问题 |
||
Hostname does not resolve |
|
|
Port name is incorrect |
|
|
Can't connect |
|
|
目录访问或权限问题 |
||
Can't access directory |
|
|
Can't create, read, update, or delete files or directories |
认证或权限问题。 |
|
Test file cannot be overwritten |
Pentaho测试文件已在目录中。 |
测试已运行,但未删除测试文件。须要手动删除测试文件。检查Kettle根目录下logs目录下的spoon.log文件中记录的测试文件名。测试文件用于验证用户能够在其主目录中建立、写入和删除。 |
表3-3 Kettle链接Hadoop时的常见问题
2. 链接Hive
Kettle把Hive看成一个数据库,支持链接Hive Server和Hive Server 2/3,数据库链接类型的名字分别为Hadoop Hive和Hadoop Hive 2/3。这里演示在Kettle中创建一个Hadoop Hive 2/3类型的数据库链接。
Hive Server有两个明显的问题,一是不够稳定,常常会莫名奇妙假死,致使客户端全部的链接都被挂起。二是并发性支持很差,若是一个用户在链接中设置了一些环境变量,绑定到一个Thrift工做线程,当该用户断开链接,另外一个用户建立了一个链接,他有可能也被分配到以前的线程,复用以前的配置。这是由于Thrift不支持检测客户端是否断开链接,也就没法清除会话的状态信息。Hive Server 2的稳定性更高,而且已经完美支持了会话。从长远来看都会以Hive Server 2做为首选。
在工做区左侧的“主对象树”标签中,选择 “DB链接” -> 右键“新建”,对话框中输入如图3-5所示的属性值。

上图的数据库链接配置窗口中的选项及定义说明以下:
- Connection Name:定义链接名称,这里为hive_cdh631。
- Connection Type:链接类型选择Hadoop Hive 2/3。
- Host Name:输入HiveServer2对应的主机名。在Cloudera Manager中,从Hive服务的“实例”标签中能够找到。
- Datebase Name:这里输入的rds是Hive里已经存在的一个数据库名称。
- Port Number:端口号输入hive.server2.thrift.port参数的值。
- User Name:用户名,这里为空。
- Password:密码,这里为空。
点击“测试”,应该弹出成功链接窗口,显示内容以下:
正确链接到数据库[hive_cdh631] 主机名 : node2 端口 : 10000 数据库名 :rds
为了让其它转换或做业可以使用此数据库链接对象,须要将它设置为共享。选择 “DB链接” -> hive_cdh631 -> 右键“共享”,而后保存转换。
3. 链接Impala
Impala是一个运行在Hadoop之上的大规模并行处理(Massively Parallel Processing,MPP)查询引擎,提供对Hadoop集群数据的高性能、低延迟的SQL查询,使用HDFS做为底层存储。对查询的快速响应使交互式查询和对分析查询的调优成为可能,而这些在针对处理长时间批处理做业的SQL-on-Hadoop传统技术上是难以完成的。Impala是Cloudera公司基于Google Dremel的开源实现。Cloudera公司宣称除Impala外的其它组件都将移植到Spark框架,并坚信Impala是大数据上SQL解决方案的将来,可见其对Impala的重视程度。
经过将Impala与Hive元数据存储数据库相结合,可以在Impala与Hive这两个组件之间共享数据库表。而且Impala与HiveQL的语法兼容,所以既可使用Impala也可使用Hive进行创建表、发布查询、装载数据等操做。Impala能够在已经存在的Hive表上执行交互式实时查询。
建立Impala链接的过程与Hive相似。在工做区左侧的“主对象树”标签中,选择“DB链接” -> 右键“新建”,对话框中输入如图3-6所示的属性值。

上图的数据库链接配置窗口中的选项及定义说明以下:
- Connection Name:定义链接名称,这里为impala_cdh631。
- Connection Type:链接类型选择Impala。
- Host Name:输入任一Impala Daemon对应的主机名。在Cloudera Manager中,从Impala服务的“实例”标签中能够找到。
- Datebase Name:这里输入的rds是Hive里已经存在的一个数据库名称。
- Port Number:端口号输入Impala Daemon HiveServer2端口参数的值。
- User Name:用户名,这里为空。
- Password:密码,这里为空。
点击“测试”,应该弹出成功链接窗口,显示内容以下:
正确链接到数据库[impala_cdh631] 主机名 : node3 端口 : 21050 数据库名 :rds
同hive_cdh631同样,将impala_cdh631数据库链接共享,而后保存转换。
4. 创建MySQL数据库链接
Kettle中建立数据库链接的方法都相似,区别只是在“链接类型”中选择不一样的数据库,而后输入相关的属性,“链接方式”一般选择Native(JDBC)。例如MySQL链接配置如图3-7所示。

这里的链接名称为mysql_node3。配置MySQL数据库链接须要注意的一点是,须要事先将对应版本的MySQL JDBC驱动程序拷贝到Kettle根目录的lib目录下,不然在测试链接时可能出现以下错误:
org.pentaho.di.core.exception.KettleDatabaseException: Error occurred while trying to connect to the database Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed. org.gjt.mm.mysql.Driver
本例中链接的MySQL服务器版本为5.6.14,所以使用下面的命令拷贝JDBC驱动,而后重启Spoon以从新加载全部驱动。
cp mysql-connector-java-5.1.38-bin.jar /root/pdi-ce-8.3.0.0-371/lib/
至此成功建立了一个Hadoop集群对象CDH631,,以及三个数据库链接对象hive_cdh63一、impala_cdh631和mysql_node3。
3、导入导出Hadoop集群数据
本节用四个示例演示如何使用Kettle导出导入Hadoop数据。这四个示例是:向HDFS导入数据;向Hive导入数据;从HDFS抽取数据到MySQL;从Hive抽取数据到MySQL。
1. 向HDFS导入数据
用Kettle将本地文件导入HDFS很是简单,只须要一个“Hadoop copy files”做业项就能够实现。它执行的效果同 hdfs dfs -put 命令是相同的。从下面的地址下载Pentaho提供的web日志示例文件,将解压缩后的weblogs_rebuild.txt文件放到Kettle所在主机的本地目录下。
在Spoon中新建一个只包含“Start”和“Hadoop copy files”两个做业项的做业,如图3-8所示。

双击“Hadoop Copy Files”做业项,编辑属性以下:
- Source Environment:选择“Local”。
- 源文件/目录:选择本地文件,本例为“file:///root/kettle_hadoop/3/weblogs_rebuild.txt”
- 通配符:空。
- Destination Environment:选择“CDH631”,这是咱们以前已经创建好的Hadoop Clusters对象。
- Destination File/Folder:选择HDFS上的目录,本例为/user/root。
保存并成功执行做业后,查看HDFS目录,结果以下。能够看到,weblogs_rebuild.txt文件已从本地导入HDFS的/user/root目录中。每次执行做业会覆盖HDFS中已存在的同名文件。
[hdfs@manager~]$hdfs dfs -ls /user/root Found 1 items -rw-r--r-- 3 root supergroup 77908174 2020-08-28 08:53 /user/root/weblogs_rebuild.txt [hdfs@manager~]$
2. 向Hive导入数据
Hive缺省是不能进行行级插入的,也就是说缺省时不能使用insert into ... values这种SQL语句向Hive插入数据。一般Hive表数据导入方式有如下两种:
- 从本地文件系统中导入数据到Hive表,使用的语句是:
load data local inpath 目录或文件 into table 表名;
- 从HDFS上导入数据到Hive表,使用的语句是:
load data inpath 目录或文件 into table 表名;
再有数据一旦导入Hive表,缺省是不能进行更新和删除的,只能向表中追加数据或者用新数据总体覆盖原来的数据。要删除表数据只能执行truncate或者drop table操做,这其实是删除了表所对应的HDFS上的数据文件或目录。
Kettle做业中的“Hadoop Copy Files”做业项能够将本地文件上传至HDFS,所以只要将前面的做业稍加修改,将Destination File/Folder选择为hive表所在的HDFS目录便可,做业执行的效果与load data local inpath语句相同。
首先从下面的地址下载Pentaho提供的格式化后的web日志示例文件,将解压缩后的weblogs_parse.txt文件放到Kettle所在主机的本地目录下。
而后执行下面的HiveQL创建一个Hive表,表结构与weblogs_parse.txt文件的结构相匹配。
create table test.weblogs ( client_ip string, full_request_date string, day string, month string, month_num int, year string, hour string, minute string, second string, timezone string, http_verb string, uri string, http_status_code string, bytes_returned string, referrer string, user_agent string) row format delimited fields terminated by '\t';
建立和前例相同的做业,只是修改如下两个做业项属性:
- 源文件/目录:file:///root/kettle_hadoop/3/weblogs_parse.txt
- Destination File/Folder:/user/hive/warehouse/test.db/weblogs
保存并成功执行做业后,查询test.weblogs表的记录与weblogs_parse.txt文件内容相同。
3. 从HDFS抽取数据到MySQL
这是Pentaho提供的一个压缩文件,其中包含一个名为weblogs_aggregate.txt的文本文件,文件中有36616行记录,每行记录有4列,分别表示IP地址、年份、月份、访问页面数,前5行记录以下。咱们使用这个文件做为最初的原始数据。
0.308.86.81 2012 07 1 0.32.48.676 2012 01 3 0.32.85.668 2012 07 8 0.45.305.7 2012 01 1 0.45.305.7 2012 02 1
用下面的命令把解压缩后的weblogs_aggregate.txt文件上传到HDFS的/user/root目录下。
hdfs dfs -put weblogs_aggregate.txt /user/root/
在Spoon中新建一个如图3-9的转换。转换中只包含“Hadoop File Input”和“表输出” 两个步骤。

编辑“Hadoop File Input”步骤属性以下:
(1)“文件”标签
- Environment:选择“CDH631”。
- File/Folder:选择“/user/root/weblogs_aggregate.txt”
(2)“内容”标签
- 文件类型:CVS
- 分隔符:删除分号,点击“Insert TAB”按钮插入TAB分隔符。
- 头部:勾掉。
- 格式:选择“Unix”。
- 本地日期格式:选择“en_US”
(3)“字段”标签
输入如表3-4所示。
名称 |
类型 |
格式 |
长度 |
去除空字符串方式 |
重复 |
client_ip |
String |
|
20 |
不去掉空格 |
否 |
year |
Integer |
# |
15 |
不去掉空格 |
否 |
month_num |
Integer |
# |
15 |
不去掉空格 |
否 |
pageviews |
Integer |
# |
15 |
不去掉空格 |
否 |
表3-4 weblogs_aggregate.txt对应的字段
编辑“表输出”步骤属性以下:
- 数据库链接:选择“mysql_node3”。
- 目标表:输入“aggregate_hdfs”。
- 剪裁表:勾选。
mysql_node3是链接Hadoop时已经建好的一个MySQL数据库链接。“主选项”和“数据库字段”标签下的属性都不须要设置,“表字段”和“流字段”会自动映射。
下面执行SQL创建mysql的表:
use test; create table aggregate_hdfs ( client_ip varchar(15), year smallint, month_num tinyint, pageviews bigint );
保存并执行转换,而后查询aggregate_hdfs表,结果以下:
mysql> select count(*) from test.aggregate_hdfs; +----------+ | count(*) | +----------+ | 36616 | +----------+ 1 row in set (0.03 sec) mysql> select * from test.aggregate_hdfs limit 5; +-------------+------+-----------+-----------+ | client_ip | year | month_num | pageviews | +-------------+------+-----------+-----------+ | 0.308.86.81 | 2012 | 7 | 1 | | 0.32.48.676 | 2012 | 1 | 3 | | 0.32.85.668 | 2012 | 7 | 8 | | 0.45.305.7 | 2012 | 1 | 1 | | 0.45.305.7 | 2012 | 2 | 1 | +-------------+------+-----------+-----------+ 5 rows in set (0.00 sec)
4. 从Hive抽取数据到MySQL
在Spoon中新建一个如图3-10的转换。转换中只包含“表输入”和“表输出” 两个步骤。

编辑“表输入”步骤属性以下:
- 数据库链接:选择“hive_cdh631”。
- SQL:输入下面的SQL语句:
select client_ip, year, month, month_num, count(*) as pageviews from test.weblogs group by client_ip, year, month, month_num
hive_cdh631是链接Hadoop时已经建好的一个Hive数据库链接。
编辑“表输出”步骤属性以下:
- 数据库链接:选择“mysql_node3”。
- 目标表:输入“aggregate_hive”。
- 剪裁表:勾选。
下面执行SQL创建mysql的表:
use test; create table aggregate_hive ( client_ip varchar(15), year varchar(4), month varchar(10), month_num tinyint, pageviews bigint );
保存并执行转换,而后查询aggregate_hive表,结果以下:
mysql> select count(*) from test.aggregate_hive; +----------+ | count(*) | +----------+ | 36616 | +----------+ 1 row in set (0.03 sec) mysql> select * from test.aggregate_hive limit 5; +---------------+------+-------+-----------+-----------+ | client_ip | year | month | month_num | pageviews | +---------------+------+-------+-----------+-----------+ | 0.45.305.7 | 2012 | Feb | 2 | 1 | | 0.48.322.75 | 2012 | Jul | 7 | 1 | | 0.638.50.46 | 2011 | Dec | 12 | 8 | | 01.660.68.623 | 2012 | Jun | 6 | 1 | | 01.660.70.74 | 2012 | Jul | 7 | 1 | +---------------+------+-------+-----------+-----------+ 5 rows in set (0.00 sec)
4、执行HiveQL语句
在这个示例中演示如何用Kettle执行Hive的HiveQL语句。咱们在“向Hive导入数据”一节创建的weblogs表上执行聚合查询,同时创建一个新表保存查询结果。新建一个Kettle做业,只有“START”和“SQL”两个做业项,如图3-11所示。

编辑“SQL”做业项属性以下:
- 数据库链接:选择“hive_cdh631”。
- SQL脚本:
create table test.weblogs_agg as select client_ip, year, month, month_num, count(*) from test.weblogs group by client_ip, year, month, month_num;
保存并成功执行做业后检查hive表,结果以下:
hive> select count(*) from test.weblogs_agg; ... 36616
能够看到weblogs_agg表中已经保存了所有的聚合数据。
5、执行MapReduce
1. 生成聚合数据集
“执行HiveQL语句”示例只用一句HiveQL就生成了聚合数据,本示例使用“Pentaho MapReduce”做业项完成类似的功能,把细节数据汇总成聚合数据集。当给一个关系型数据仓库或数据集市准备待抽取的数据时,这是一个常见的使用场景。咱们把weblogs_parse.txt文件做为细节数据,目标是生成聚合数据文件,其中包含按IP和年月分组统计的PV数。
(1)准备文件与目录
# 建立格式化文件所在目录 hdfs dfs -mkdir /user/root/parse/ # 上传格式化文件 hdfs dfs -put -f weblogs_parse.txt /user/root/parse/ # 修改读写权限 hdfs dfs -chmod -R 777 /user/root/
(2)创建一个用于Mapper的转换

如图3-12所示的转换由“MapReduce Input”、“拆分字段”、“利用Janino计算Java表达式”、“MapReduce Output”四个步骤组成。
编辑“MapReduce Input”步骤以下:
- Key field:“Type”选择“String”,定义 Hadoop MapReduce 键的数据类型。
- Value field:“Type”选择“String”,定义 Hadoop MapReduce 值的数据类型。
该步骤输出两个字段,名称是固定的key和value,也就是Map阶段输入的键值对。
编辑“拆分字段”步骤以下:
- 须要拆分的字段:选择“value”。
- 分隔符:输入“$[09]”,以TAB做为分隔符。
- 字段:新的字段名以下,类型均为String。
client_ip full_request_date day month month_num year hour minute second timezone http_verb uri http_status_code bytes_returned referrer user_agent
该步骤将输入的value字段拆分红16个字段,输出17个字段(key字段没变,文本文件每行的key是文件起始位置到每行的字节偏移量)。
编辑“利用Janino计算Java表达式”步骤如表3-5所示。
New field |
Java expression |
Value type |
new_key |
client_ip + '\t' + year + '\t' + month_num |
String |
new_value |
1 |
Integer |
表3-5 聚合数据转换中的“利用Janino计算Java表达式”步骤
该步骤为数据流中增长两个新的字段,名称分别定义为new_key和new_value。new_key字段的值定义为client_ip + '\t' + year + '\t' + month_num,将IP地址、年份、月份和字段间的两个TAB符拼接成一个字符串。new_value字段的值为1,数据类型是整数。该步骤输出19个字段。
编辑“MapReduce Output”步骤以下:
- Key field:选择“new_key”。
- Value field:选择“new_value”。
该步骤输出“new_key”和“new_value”两个字段,即Map阶段输出的键值对。
将转换保存为aggregate_mapper.ktr。
(3)创建一个用于Reducer的转换
图3-13 生成聚合数据Reducer转换
如图3-13所示的转换由“MapReduce Input”、“分组”、“MapReduce Output”三个步骤组成。
编辑“MapReduce Input”步骤以下:
. Key field:“Type”选择“String”。
. Value field:“Type”选择“Integer”。
该步骤输出两个字段,名称是固定的key和value,key对应Mapper转换的new_key输出字段,value对应Mapper转换的new_value输出字段。
编辑“分组”步骤以下:
- 构成分组的字段:选择“key”。
- 聚合:名称、Subject、类型三列的值分别是new_value、value、求和。
该步骤按key字段分组(key字段的值就是client_ip + '\t' + year + '\t' + month_num),对每一个分组的value求和,每组的合计值定义为一个新的字段new_value。注意,此处的new_value和Mapper转换输出的new_value字段含义是不一样的。Mapper转换输出的new_value字段对应这里的Subject字段值。
编辑“MapReduce Output”步骤以下:
- Key field:选择“key”。
- Value field:选择“new_value”。
输出Reducer处理后的键值对,这就是咱们想要的结果。
将转换保存为aggregate_reducer.ktr。
(4)创建一个调用MapReduce步骤的做业

如图3-14所示的做业使用mapper和reducer转换。须要编辑Pentaho MapReduce做业项的Mapper、Reducer、job Setup、Cluster四个标签页,每一个标签页上的选项及定义。
Mapper标签:
- Transformation:选择第(1)步创建的Mapper转换,这里为“/root/kettle_hadoop/3/aggregate_mapper.ktr”。
- Input step name:输入“MapReduce Input”。这是接收mapping数据的步骤名,必须是一个MapReduce Input步骤的名称。
- Output step name:输入“MapReduce Output”。这是mapping输出步骤名,必须是一个MapReduce Output步骤的名称。
Reducer标签:
- Transformation:选择第(2)步创建的Reducer转换,这里为“/root/kettle_hadoop/3/aggregate_mapper.ktr”。
- Input step name:输入“MapReduce Input”。这是接收reducing数据的步骤名,必须是一个MapReduce Input步骤的名称。
- Output step name:输入“MapReduce Output”。这是reducing输出步骤名,必须是一个MapReduce Output步骤的名称。
Job Setup标签:
- Input path:输入“/user/root/parse/”。一个以逗号分隔的HDFS目录列表,目录中存储的是MapReduce要处理的源数据文件。
- Output path:输入“/user/root/aggregate_mr”。存储MapReduce做业输出数据的HDFS目录。
- Remove output path before job:勾选。执行做业时先删除输出目录。
- Input format:输入“org.apache.hadoop.mapred.TextInputFormat”,为输入格式的类名。
- Output format:输入“org.apache.hadoop.mapred.TextOutputFormat”,为输出格式的类名。
Cluster标签:
- Hadoop job name:输入“aggregate”。
- Hadoop cluster:选择“CDH631”,为一个已经定义的Hadoop集群。
- Number of mapper tasks:1。分配的mapper任务数,由输入的数据量所决定。典型的值在10-100之间。非CPU密集型的任务能够指定更高的值。
- Number of reduce tasks:1。分配的reducer任务数。通常来讲,该值设置的越小,reduce操做启动的越快,设置的越大,reduce操做完成的更快。加大该值会增长Hadoop框架的开销,但可以使负载更加均衡。若是设置为0,则不执行reduce操做,mapper的输出将做为整个MapReduce做业的输出。
- Logging interval:60。日志消息间隔的秒数。
- Enable blocking:勾选。若是选中,做业将等待每个做业项完成后再继续下一个做业项,这是Kettle感知Hadoop做业状态的惟一方式。若是不选,MapReduce做业会本身执行,而Kettle在提交MapReduce做业后当即会执行下一个做业项。除非选中该项,不然Kettle的错误处理在这里将没法工做。
将做业保存为aggregate_mr.kjb。
(5)执行做业并验证输出
[hdfs@node3~]$hdfs dfs -ls /user/root/aggregate_mr/ Found 2 items -rw-r--r-- 3 root supergroup 0 2020-08-31 13:46 /user/root/aggregate_mr/_SUCCESS -rw-r--r-- 3 root supergroup 890709 2020-08-31 13:46 /user/root/aggregate_mr/part-00000 [hdfs@node3~]$hdfs dfs -cat /user/root/aggregate_mr/part-00000 | head -10 0.308.86.81 2012 07 1 0.32.48.676 2012 01 3 0.32.85.668 2012 07 8 0.45.305.7 2012 01 1 0.45.305.7 2012 02 1 0.46.386.626 2011 11 1 0.48.322.75 2012 07 1 0.638.50.46 2011 12 8 0.87.36.333 2012 08 7 01.660.68.623 2012 06 1 cat: Unable to write to output stream. [hdfs@node3~]$
能够看到,/user/root/aggregate_mr/目录下生成了名为part-00000输出文件,文件中包含按IP和年月分组的PV数。
2. 格式化原始web日志
本示例说明如何使用Pentaho MapReduce把原始web日志解析成格式化的记录。
(1)准备文件与目录
# 建立原始文件所在目录 hdfs dfs -mkdir /user/root/raw # 修改读写权限 hdfs dfs -chmod -R 777 /user/root/
而后用Hadoop copy files做业项将weblogs_rebuild.txt文件放到HDFS的/user/root/raw目录下,具体操做参见前面“向HDFS导入数据”。
(2)创建一个用于Mapper的转换

编辑“MapReduce Input”步骤以下:
- Key field:“Type”选择“String”。
- Value field:“Type”选择“String”。
编辑“正则表达式”步骤以下:
- 要匹配的字段:输入“value”。
- Result field name:输入“is_match”
- 为每一个捕获组(capture group)建立一个字段:勾选。
- Replace previous fields:勾选。
- 正则表达式:
^([^\s]{7,15})\s # client_ip -\s # unused IDENT field -\s # unused USER field \[((\d{2})/(\w{3})/(\d{4}) # request date dd/MMM/yyyy :(\d{2}):(\d{2}):(\d{2})\s([-+ ]\d{4}))\] # request time :HH:mm:ss -0800 \s"(GET|POST)\s # HTTP verb ([^\s]*) # HTTP URI \sHTTP/1\.[01]"\s # HTTP version (\d{3})\s # HTTP status code (\d+)\s # bytes returned "([^"]+)"\s # referrer field " # User agent parsing, always quoted. "? # Sometimes if the user spoofs the user_agent, they incorrectly quote it. ( # The UA string [^"]*? # Uninteresting bits (?: (?: rv: # Beginning of the gecko engine version token (?=[^;)]{3,15}[;)]) # ensure version string size ( # Whole gecko version (\d{1,2}) # version_component_major \.(\d{1,2}[^.;)]{0,8}) # version_component_minor (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_a (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_b ) [^"]* # More uninteresting bits ) | [^"]* # More uninteresting bits ) ) # End of UA string "? "
- 捕获组(Capture Group)字段:以下所示,全部字段都是String类型。
client_ip full_request_date day month year hour minute second timezone http_verb uri http_status_code bytes_returned referrer user_agent firefox_gecko_version firefox_gecko_version_major firefox_gecko_version_minor firefox_gecko_version_a firefox_gecko_version_b
编辑“过滤记录”步骤以下:
- 发送true数据给步骤:选择“值映射”。
- 发送false数据给步骤:选择“空操做(什么也不作)”
- 条件:选择“is_match = Y”
编辑“值映射”步骤以下:
- 使用的字段名:选择“month”。
- 目标字段名(空=覆盖):输入“month_num”。
- 不匹配时的默认值:输入“00”。
- 字段值:源值与目标值输入以下。
Jan 01 Feb 02 Mar 03 Apr 04 May 05 Jun 06 Jul 07 Aug 08 Sep 09 Oct 10 Nov 11 Dec 12
编辑“利用Janino计算Java表达式”步骤以下:
- New field:输入“output_value”。
- Java expression:输入以下。
client_ip + '\t' + full_request_date + '\t' + day + '\t' + month + '\t' + month_num + '\t' + year + '\t' + hour + '\t' + minute + '\t' + second + '\t' + timezone + '\t' + http_verb + '\t' + uri + '\t' + http_status_code + '\t' + bytes_returned + '\t' + referrer + '\t' + user_agent
- Value type:选择“String”。
编辑“MapReduce Output”步骤以下:
- Key field:选择“key”。
- Value field:选择“output_value”。
将转换保存为weblog_parse_mapper.ktr。
(3)创建一个调用MapReduce步骤的做业

编辑“Pentaho MapReduce”做业项以下。
Mapper标签:
- Transformation:选择上一步创建的转换,这里为“/root/kettle_hadoop/3/weblogs_parse_mapper.ktr”。
- Input step name:输入“MapReduce Input”。
- Output step name:输入“MapReduce Output”。
Job Setup标签:
- Input path:输入“/user/root/raw”。
- Output path:输入“/user/root/parse1”。
- Remove output path before job:勾选。
- Input format:输入“org.apache.hadoop.mapred.TextInputFormat”。
- Output format:输入“org.apache.hadoop.mapred.TextOutputFormat”。
Cluster标签:
- Hadoop job name:输入“Web Log Parse”。
- Hadoop cluster:选择“CDH631”。
- Number of mapper tasks:2
- Number of reduce tasks:0
- Logging interval:60
- Enable blocking:勾选。
将做业保存为weblogs_parse_mr.kjb。
(4)执行做业并验证输出
做业成功执行后检查HDFS的输出文件,结果以下。
[hdfs@node3~]$hdfs dfs -ls /user/root/parse1 Found 3 items -rw-r--r-- 3 root supergroup 0 2020-08-31 10:59 /user/root/parse1/_SUCCESS -rw-r--r-- 3 root supergroup 42601640 2020-08-31 10:59 /user/root/parse1/part-00000 -rw-r--r-- 3 root supergroup 42810160 2020-08-31 10:59 /user/root/parse1/part-00001 [hdfs@node3~]$hdfs dfs -get /user/root/parse1/part-00000 [hdfs@node3~]$head -5 part-00000 0 323.81.303.680 25/Oct/2011:01:41:00 -0500 25 Oct 10 2011 01 41 00 -0500 GET /download/download6.zip 200 0 - Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.19) Gecko/2010031422 Firefox/3.0.19 193 668.667.44.3 25/Oct/2011:07:38:30 -0500 25 Oct 10 2011 07 38 30 -0500 GET /download/download3.zip 200 0 - Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.12) Gecko/20070719 CentOS/1.5.0.12-3.el5.centos Firefox/1.5.0.12 405 13.386.648.380 25/Oct/2011:17:06:00 -0500 25 Oct 10 2011 17 06 00 -0500 GET /download/download6.zip 200 0 - Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB6.3; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; InfoPath.2) 651 06.670.03.40 26/Oct/2011:13:24:00 -0500 26 Oct 10 2011 13 24 00 -0500 GET /product/demos/product2 200 0 - Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3 838 18.656.618.46 26/Oct/2011:17:15:30 -0500 26 Oct 10 2011 17 15 30 -0500 GET /download/download4.zip 200 0 - Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_3; en-us) AppleWebKit/531.22.7 (KHTML, like Gecko) Version/4.0.5 Safari/531.22.7 [hdfs@node3~]$
能够看到,/user/root/parse1目录下生成了名为part-00000和part-00001的两个输出文件(由于使用了两个mapper),内容已经被格式化。
6、提交Spark做业
Kettle不但支持MapReduce做业,还能够经过“Spark Submit”做业项,向CDH 5.3以上、HDP 2.3以上、Amazon EMR 3.10以上的Hadoop平台提交Spark做业。在本示例中,咱们先为Kettle配置Spark,而后修改并执行Kettle安装包中自带的Spark PI做业例子,说明如何在Kettle中提交Spark做业。
1. 在Kettle主机上安装Spark客户端
使用Kettle执行Spark做业,须要在Kettle主机安装Spark客户端。只要将CDH中Spark的库文件复制到Kettle所在主机便可。
-- 在172.16.1.127上执行 cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib scp -r spark 172.16.1.101:/root/
2. 为Kettle配置Spark
如下操做均在172.16.1.101以root用户执行。
(1)备份原始配置文件
cd /root/spark/conf/ cp spark-defaults.conf spark-defaults.conf.bak cp spark-env.sh spark-env.sh.bak
(2)编辑spark-defaults.conf文件
vim /root/spark/conf/spark-defaults.conf
内容以下:
# 使用spark.yarn.archive减小任务启动时间 spark.yarn.archive=hdfs://nameservice1/user/spark/lib/spark_jars.zip # 解决和yarn相关Jersey包冲突,避免spark on yarn启动spark-submit时出现java.lang.NoClassDefFoundError错误 spark.hadoop.yarn.timeline-service.enabled=false # 记录Spark事件,用于应用程序在完成后重构WebUI spark.eventLog.enabled=true # 记录Spark事件的目录 spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory # spark on yarn的history server地址 spark.yarn.historyServer.address=http://node3:18088
(3)编辑spark-env.sh文件
vim /root/spark/conf/spark-env.sh
内容以下:
#!/usr/bin/env bash # hadoop配置文件所在目录 HADOOP_CONF_DIR=/root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61 # spark主目录 SPARK_HOME=/root/spark
(4)编辑core-site.xml文件
vim /root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/core-site.xml
去掉下面这段的注释:
<property> <name>net.topology.script.file.name</name> <value>/etc/hadoop/conf.cloudera.yarn/topology.py</value> </property>
3. 提交Spark做业
(1)修改Kettle自带的Spark例子
cp /root/pdi-ce-8.3.0.0-371/samples/jobs/Spark\ Submit/Spark\ submit.kjb /root/kettle_hadoop/3/spark_submit.kjb
在Spoon中打开/root/kettle_hadoop/spark_submit.kjb文件,如图3-17所示。

编辑Spark PI做业项以下:
- Spark Submit Utility:选择Spark提交程序,本例为“/root/spark/bin/spark-submit”。
- Master URL:由于yarn运行在CDH集群,而不是Kettle主机上,因此这里选择“yarn-cluster”。
- Files标签的Application Jar:选择“/root/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.1.jar”。
(2)保存行执行做业
Spark History Server Web UI如图3-18所示。

7、小结
本篇以Kettle 8.3和CDH 6.3.1为例,介绍Kettle对Hadoop的支持。经过提交适当的参数,Kettle能够链接Hadoop的HDFS、MapReduce、Zookeeper、Oozie和Spark服务。Kettle的数据库链接类型中支持Hive、Hive 2/3和Impala。可使用Kettle导出导入Hadoop集群中(HDFS、Hive等)的数据,执行Hive的HiveQL语句。Kettle支持在Hadoop中执行基于MapReduce的Kettle转换,还支持向Spark集群提交做业。这里演示的例子都是Pentaho官方提供示例。从下一篇开始,咱们将创建一个模拟的Hadoop数据仓库,并用使用Kettle完成其上的ETL操做。