Kettle构建Hadoop ETL实践(三):Kettle对Hadoop的支持

目录java

1、Hadoop相关的步骤与做业项node

2、链接Hadoopmysql

1. 链接Hadoop集群web

(1)开始前准备正则表达式

(2)配置步骤sql

2. 链接Hive数据库

3. 链接Impalaexpress

4. 创建MySQL数据库链接apache

3、导入导出Hadoop集群数据编程

1. 向HDFS导入数据

2. 向Hive导入数据

3. 从HDFS抽取数据到MySQL

4. 从Hive抽取数据到MySQL

4、执行HiveQL语句

5、执行MapReduce

1. 生成聚合数据集

(1)准备文件与目录

(2)创建一个用于Mapper的转换

(4)创建一个调用MapReduce步骤的做业

(5)执行做业并验证输出

2. 格式化原始web日志

(1)准备文件与目录

(2)创建一个用于Mapper的转换

(3)创建一个调用MapReduce步骤的做业

(4)执行做业并验证输出

6、提交Spark做业

1. 在Kettle主机上安装Spark客户端

2. 为Kettle配置Spark

(1)备份原始配置文件

(2)编辑spark-defaults.conf文件

(3)编辑spark-env.sh文件

(4)编辑core-site.xml文件

3. 提交Spark做业

(1)修改Kettle自带的Spark例子

(2)保存行执行做业

7、小结


        本篇演示使用Kettle操做Hadoop上的数据。首先概要介绍Kettle对大数据的支持,而后用示例说明Kettle如何链接Hadoop,如何导入导出Hadoop集群上的数据,如何用Kettle执行Hive的HiveQL语句,还会用一个典型的MapReduce转换,说明Kettle在实际应用中是怎样利用Hadoop分布式计算框架的。本篇最后介绍如何在Kettle中提交Spark做业。

1、Hadoop相关的步骤与做业项

        在“ETL与Kettle”(https://wxy0327.blog.csdn.net/article/details/107985148)的小结中曾提到,Kettle具备完备的转换步骤与做业项,使它可以支持几乎全部常见数据源。一样Kettle对大数据也提供了强大的支持,这体如今转换步骤与做业项的“Big Data”分类中。本例使用的Kettle 8.3版本中所包含的大数据相关步骤有19个,做业项有10个。表3-1和表3-2分别对这些步骤和做业项进行了简单描述。

步骤名称

描述

Avro input

读取Avro格式文件

Avro output

写入Avro格式文件

Cassandra input

从一个Cassandra column family中读取数据

Cassandra output

向一个Cassandra column family中写入数据

CouchDB input

获取CouchDB数据库一个设计文档中给定视图所包含的全部文档

HBase input

从HBase column family中读取数据

HBase output

向HBase column family中写入数据

HBase row decoder

对HBase的键/值对进行编码

Hadoop file input

读取存储在Hadoop集群中的文本型文件

Hadoop file output

向存储在Hadoop集群中的文本型文件中写数据

MapReduce input

向MapReduce输入键值对

MapReduce output

从MapReduce输出键值对

MongoDB input

读取MongoDB中一个指定数据库集合的全部记录

MongoDB output

将数据写入MongoDB的集合中

ORC input

读取ORC格式文件

ORC output

写入ORC格式文件

Parquet input

读取Parquet格式文件

Parquet output

写入Parquet格式文件

SSTable output

做为Cassandra SSTable写入一个文件系统目录

表3-1 Kettle转换中的大数据相关步骤

做业项名称

描述

Amazon EMR job executor

在Amazon EMR中执行MapReduce做业

Amazon Hive job executor

在Amazon EMR中执行Hive做业

Hadoop copy files

将本地文件上传到HDFS,或者在HDFS上复制文件

Hadoop job executor

在Hadoop节点上执行包含在JAR文件中的MapReduce做业

Oozie job executor

执行Oozie工做流

Pentaho MapReduce

在Hadoop中执行基于MapReduce的转换

Pig script executor

在Hadoop集群上执行Pig脚本

Spark submit

提交Spark做业

Sqoop export

使用Sqoop将HDFS上的数据导出到一个关系数据库中

Sqoop import

使用Sqoop将一个关系数据库中的数据导入到HDFS上

表3-2 Kettle做业中的大数据相关做业项

        Kettle的设计很独特,它既能够在Hadoop集群外部执行,也能够在Hadoop集群内的节点上执行。在外部执行时,Kettle可以从HDFS、Hive和HBase抽取数据,或者向它们中装载数据。在Hadoop集群内部执行时,Kettle转换能够做为Mapper或Reducer任务执行,并容许将Pentaho MapReduce做业项做为MapReduce的可视化编程工具来使用。后面咱们会用示例演示这些功能。关于Hadoop及其组件的基本概念和功能特性不是本专题所讨论的范畴,可参考其它资源。

2、链接Hadoop

        Kettle能够与Hadoop协同工做。经过提交适当的参数,Kettle能够链接Hadoop的HDFS、MapReduce、Zookeeper、Oozie、Sqoop和Spark服务。在数据库链接类型中支持Hive和Impala。在本示例中配置Kettle链接HDFS、Hive和Impala。为了给本专题后面实践中建立的转换或做业使用,咱们还将定义一个普通的mysql数据库链接对象。

1. 链接Hadoop集群

        要使Kettle链接Hadoop集群,须要两个操做:设置一个Active Shim;创建并测试链接。Shim是Pentaho开发的插件,功能有点相似于一个适配器,帮助用户链接Hadoop。Pentaho按期发布Shim,能够从sourceforge网站下载与Kettle版本对应的Shim安装包。使用Shim可以链接不一样的Hadoop发行版本,如CDH、HDP、MapR、Amazon EMR等。当在Kettle中执行一个大数据的转换或做业时,缺省会使用设置的Active Shim。初始安装Kettle时,并无Active Shim,所以在尝试链接Hadoop集群前,首先要作的就是选择一个Active Shim,选择的同时也就激活了此Active Shim。设置好Active Shim后,再通过必定的配置,就能够测试链接了。Kettle内建的工具能够为完成这些工做提供帮助。

(1)开始前准备

        在配置链接前,要确认Kettle具备访问HDFS相关目录的权限,访问的目录一般包括用户主目录以及工做须要的其它目录。Hadoop管理员应该已经配置了容许Kettle所在主机对Hadoop集群的访问。除权限外,还须要确认如下信息:

        本例中已经安装好4个节点的CDH 6.3.1集群,IP地址及主机名以下:
172.16.1.124 manager
172.16.1.125 node1
172.16.1.126 node2
172.16.1.127 node3



        启动的Hadoop服务如图3-1所示,全部服务都使用缺省端口。关于CDH集群的安装与卸载,能够参见个人博客“基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)”和“一键式彻底删除CDH 6.3.1”。

图3-1 Hadoop集群服务

        为了用主机名访问Hadoop相关服务,在Kettle主机(172.16.1.101)的/etc/hosts文件中添加了Hadoop集群四个节点的IP与主机名。

(2)配置步骤

1. 在Kettle中配置Hadoop客户端文件
        在浏览器中登陆Cloudera Manager,选择Hive服务,点击“操做”->“下载客户端配置”。在获得的hive-clientconfig.zip压缩包中包括了当前Hadoop客户端的12个配置文件。将其中的core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml 5个文件复制到Kettle根目录下的plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/目录下,覆盖原来Kettle自带的这些文件。

2. 选择Active Shim
        在Spoon界面中,选择主菜单“工具” -> “Hadoop Distribution...”,在对话框中选择“Cloudera CDH 6.1.0”,如图3-2所示,点击OK按钮肯定后重启Spoon。

图3-2 选择Active Shim

3. 在Spoon中建立Hadoop clusters对象
        新建一个转换,在工做区左侧的树的“主对象树”标签中,选择 Hadoop clusters -> 右键New Cluster,对话框中输入如图3-3所示的属性值。

图3-3 Hadoop集群链接配置

        上图的Hadoop集群配置窗口中的选项及定义说明以下:

  • Cluster Name:定义要链接的集群名称,这里为CDH631。
  • Hostname(HDFS段):Hadoop集群中NameNode节点的主机名。因为本例中的CDH配置了HDFS HA,这里用HDFS NameNode服务名替代了主机名。
  • Port(HDFS段):Hadoop集群中NameNode节点的端口号,HA不须要填写。 
  • Username(HDFS段):HDFS的用户名,经过宿主操做系统给出,不用填。
  • Password(HDFS段):HDFS的密码,经过宿主操做系统给出,不用填。
  • Hostname(JobTracker段):Hadoop集群中JobTracker节点的主机名。若是有独立的JobTracker节点,在此输入,不然使用HDFS的主机名。
  • Port(JobTracker段):Hadoop集群中JobTracker节点的端口号,不能与HDFS的端口号相同。
  • Hostname(ZooKeeper段):Hadoop集群中Zookeeper节点的主机名,只有在链接Zookeeper服务时才须要。
  • Port(ZooKeeper段):Hadoop集群中Zookeeper节点的端口号,只有在链接Zookeeper服务时才须要。
  • URL(Oozie段):Oozie WebUI的地址,只有在链接Oozie服务时才须要。

        这是本例CDH的配置,你应该按本身的状况进行相应修改。而后点击“Test”按钮,测试结果如图3-4所示。正常状况下此时除了一个Kafka链接失败的警告外,其它都应该经过测试。Kafka链接失败,缘由是没有配置Kafka的Bootstrap servers。咱们在CDH中并无启动Kafka服务,所以忽略此警告。

图3-4 测试经过

        关闭“Hadoop Cluster Test”窗口后,点击“Hadoop cluster”窗口的“肯定”按钮,至此就创建了一个Kettle能够链接的Hadoop集群。

        若是是首次配置Kettle链接Hadoop,不免会出现这样那样的问题,Pentaho文档中列出了配置过程当中的常见问题及其通用解决方法,如表3-3所示。但愿这能对Kettle或Hadoop新手有所帮助。

症状

一般缘由

通用解决方法

Shim和配置问题

No shim

  • 没有选择shim。
  • shim安装位置错误。
  • plugin.properties 文件中没有正确的shim名称。
  • 检查plugin.properties文件中active.hadoop.configuration参数的值是否与pentaho-big-data-plugin/hadoop-configurations下的目录名相匹配。
  • 确认shim安装在正确的位置(缺省安装在Kettle安装目录的plugins/pentaho-big-data-plugin子目录下)。
  • 参考Pentaho “Set Up Pentaho to Connect to a Hadoop Cluster”文档,确认shim插件的名称和安装目录。

Shim doesn't load

  • 没有安装许可证。
  • Kettle版本不支持装载的shim。
  • 若是选择的是MapR shim,客户端可能没有正确安装。
  • 配置文件改变致使错误。
  • 参考Pentaho “required licenses are installed”文档,验证许可证安装,而且确认许可证没有过时。
  • 参考Pentaho “Components Reference”文档,验证使用的Kettle版本所支持的shim。
  • 参考Pentaho “Set Up Pentaho to Connect to an Apache Hadoop Cluster”文档,检查配置文件。
  • 若是链接的是MapR,检查客户端安装,而后重启Kettle后再测试链接。
  • 若是该错误持续发生,文件可能损坏,须要从Pentaho官网下载新的shim文件。

The file system's URL does not match the URL in the configuration file

*-site.xml文件配置错误

参考Pentaho “Set Up Pentaho to Connect to an Apache Hadoop Cluster”文档,检查配置文件,主要是core-site.xml文件是否配置正确。

Sqoop Unsupported major.minor version Error

在pentaho6.0中,Hadoop集群上的Java版本比Pentaho使用的Java版本旧。

  • 验证JDK是否知足受支持组件列表中的要求。
  • 验证Pentaho服务器上的JDK是否与Hadoop集群上的JDK主版本相同。

链接问题

Hostname does not resolve

  • 没有指定主机名。
  • 主机名/IP地址错误。
  • 主机名没有正确解析。
  • 验证主机名/IP地址是否正确。
  • 检查DNS或hosts文件,确认主机名解析正确。

Port name is incorrect

  • 没有指定端口号。
  • 端口号错误。
  • 验证端口号是否正确。
  • 确认Hadoop集群是否启用了HA,若是是,则不须要指定端口号。

Can't connect

  • 被防火墙阻止。
  • 其它网络问题。
  • 检查防火墙配置,并确认没有其它网络问题。

目录访问或权限问题

Can't access directory

  • 认证或权限问题。
  • 目录不在集群上。
  • 确认链接使用的用户对被访问的目录有读、写、或执行权限。
  • 检查集群的安全设置(如dfs.permissions等)是否容许shim访问。
  • 验证HDFS的主机名和端口号是否正确。

Can't create, read, update, or delete files or directories

认证或权限问题。

  • 确认用户已经被授予目录的执行权限
  • 检查集群的安全设置(如dfs.permissions等)是否容许shim访问。
  • 验证HDFS的主机名和端口号是否正确。

Test file cannot be overwritten

Pentaho测试文件已在目录中。

测试已运行,但未删除测试文件。须要手动删除测试文件。检查Kettle根目录下logs目录下的spoon.log文件中记录的测试文件名。测试文件用于验证用户能够在其主目录中建立、写入和删除。

表3-3 Kettle链接Hadoop时的常见问题

2. 链接Hive

        Kettle把Hive看成一个数据库,支持链接Hive Server和Hive Server 2/3,数据库链接类型的名字分别为Hadoop Hive和Hadoop Hive 2/3。这里演示在Kettle中创建一个Hadoop Hive 2/3类型的数据库链接。

        Hive Server有两个明显的问题,一是不够稳定,常常会莫名奇妙假死,致使客户端全部的链接都被挂起。二是并发性支持很差,若是一个用户在链接中设置了一些环境变量,绑定到一个Thrift工做线程,当该用户断开链接,另外一个用户建立了一个链接,他有可能也被分配到以前的线程,复用以前的配置。这是由于Thrift不支持检测客户端是否断开链接,也就没法清除会话的状态信息。Hive Server 2的稳定性更高,而且已经完美支持了会话。从长远来看都会以Hive Server 2做为首选。

        在工做区左侧的“主对象树”标签中,选择 “DB链接” -> 右键“新建”,对话框中输入如图3-5所示的属性值。

图3-5 Hive链接配置

        上图的数据库链接配置窗口中的选项及定义说明以下:

  • Connection Name:定义链接名称,这里为hive_cdh631。
  • Connection Type:链接类型选择Hadoop Hive 2/3。
  • Host Name:输入HiveServer2对应的主机名。在Cloudera Manager中,从Hive服务的“实例”标签中能够找到。
  • Datebase Name:这里输入的rds是Hive里已经存在的一个数据库名称。
  • Port Number:端口号输入hive.server2.thrift.port参数的值。
  • User Name:用户名,这里为空。
  • Password:密码,这里为空。

        点击“测试”,应该弹出成功链接窗口,显示内容以下:

正确链接到数据库[hive_cdh631] 
主机名         : node2
端口           : 10000
数据库名       :rds

        为了让其它转换或做业可以使用此数据库链接对象,须要将它设置为共享。选择 “DB链接” -> hive_cdh631 -> 右键“共享”,而后保存转换。

3. 链接Impala

        Impala是一个运行在Hadoop之上的大规模并行处理(Massively Parallel Processing,MPP)查询引擎,提供对Hadoop集群数据的高性能、低延迟的SQL查询,使用HDFS做为底层存储。对查询的快速响应使交互式查询和对分析查询的调优成为可能,而这些在针对处理长时间批处理做业的SQL-on-Hadoop传统技术上是难以完成的。Impala是Cloudera公司基于Google Dremel的开源实现。Cloudera公司宣称除Impala外的其它组件都将移植到Spark框架,并坚信Impala是大数据上SQL解决方案的将来,可见其对Impala的重视程度。

        经过将Impala与Hive元数据存储数据库相结合,可以在Impala与Hive这两个组件之间共享数据库表。而且Impala与HiveQL的语法兼容,所以既可使用Impala也可使用Hive进行创建表、发布查询、装载数据等操做。Impala能够在已经存在的Hive表上执行交互式实时查询。

        建立Impala链接的过程与Hive相似。在工做区左侧的“主对象树”标签中,选择“DB链接” -> 右键“新建”,对话框中输入如图3-6所示的属性值。

图3-6 Impala链接配置

        上图的数据库链接配置窗口中的选项及定义说明以下:

  • Connection Name:定义链接名称,这里为impala_cdh631。
  • Connection Type:链接类型选择Impala。
  • Host Name:输入任一Impala Daemon对应的主机名。在Cloudera Manager中,从Impala服务的“实例”标签中能够找到。
  • Datebase Name:这里输入的rds是Hive里已经存在的一个数据库名称。
  • Port Number:端口号输入Impala Daemon HiveServer2端口参数的值。
  • User Name:用户名,这里为空。
  • Password:密码,这里为空。

        点击“测试”,应该弹出成功链接窗口,显示内容以下:

正确链接到数据库[impala_cdh631] 
主机名         : node3
端口           : 21050
数据库名       :rds

        同hive_cdh631同样,将impala_cdh631数据库链接共享,而后保存转换。

4. 创建MySQL数据库链接

        Kettle中建立数据库链接的方法都相似,区别只是在“链接类型”中选择不一样的数据库,而后输入相关的属性,“链接方式”一般选择Native(JDBC)。例如MySQL链接配置如图3-7所示。

图3-7 MySQL链接配置

        这里的链接名称为mysql_node3。配置MySQL数据库链接须要注意的一点是,须要事先将对应版本的MySQL JDBC驱动程序拷贝到Kettle根目录的lib目录下,不然在测试链接时可能出现以下错误:

org.pentaho.di.core.exception.KettleDatabaseException: 
Error occurred while trying to connect to the database

Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

        本例中链接的MySQL服务器版本为5.6.14,所以使用下面的命令拷贝JDBC驱动,而后重启Spoon以从新加载全部驱动。

cp mysql-connector-java-5.1.38-bin.jar /root/pdi-ce-8.3.0.0-371/lib/

        至此成功建立了一个Hadoop集群对象CDH631,,以及三个数据库链接对象hive_cdh63一、impala_cdh631和mysql_node3。

3、导入导出Hadoop集群数据

        本节用四个示例演示如何使用Kettle导出导入Hadoop数据。这四个示例是:向HDFS导入数据;向Hive导入数据;从HDFS抽取数据到MySQL;从Hive抽取数据到MySQL。

1. 向HDFS导入数据

        用Kettle将本地文件导入HDFS很是简单,只须要一个“Hadoop copy files”做业项就能够实现。它执行的效果同 hdfs dfs -put 命令是相同的。从下面的地址下载Pentaho提供的web日志示例文件,将解压缩后的weblogs_rebuild.txt文件放到Kettle所在主机的本地目录下。

http://wiki.pentaho.com/download/attachments/23530622/weblogs_rebuild.txt.zip?version=1&modificationDate=1327069200000

        在Spoon中新建一个只包含“Start”和“Hadoop copy files”两个做业项的做业,如图3-8所示。

图3-8 向HDFS导入数据的做业

 

        双击“Hadoop Copy Files”做业项,编辑属性以下:

  • Source Environment:选择“Local”。
  • 源文件/目录:选择本地文件,本例为“file:///root/kettle_hadoop/3/weblogs_rebuild.txt”
  • 通配符:空。
  • Destination Environment:选择“CDH631”,这是咱们以前已经创建好的Hadoop Clusters对象。
  • Destination File/Folder:选择HDFS上的目录,本例为/user/root。

        保存并成功执行做业后,查看HDFS目录,结果以下。能够看到,weblogs_rebuild.txt文件已从本地导入HDFS的/user/root目录中。每次执行做业会覆盖HDFS中已存在的同名文件。

[hdfs@manager~]$hdfs dfs -ls /user/root
Found 1 items
-rw-r--r--   3 root supergroup   77908174 2020-08-28 08:53 /user/root/weblogs_rebuild.txt
[hdfs@manager~]$

2. 向Hive导入数据

        Hive缺省是不能进行行级插入的,也就是说缺省时不能使用insert into ... values这种SQL语句向Hive插入数据。一般Hive表数据导入方式有如下两种:

  • 从本地文件系统中导入数据到Hive表,使用的语句是:
    load data local inpath 目录或文件 into table 表名;

     

  • 从HDFS上导入数据到Hive表,使用的语句是:
    load data inpath 目录或文件 into table 表名;

     

        再有数据一旦导入Hive表,缺省是不能进行更新和删除的,只能向表中追加数据或者用新数据总体覆盖原来的数据。要删除表数据只能执行truncate或者drop table操做,这其实是删除了表所对应的HDFS上的数据文件或目录。

        Kettle做业中的“Hadoop Copy Files”做业项能够将本地文件上传至HDFS,所以只要将前面的做业稍加修改,将Destination File/Folder选择为hive表所在的HDFS目录便可,做业执行的效果与load data local inpath语句相同。

        首先从下面的地址下载Pentaho提供的格式化后的web日志示例文件,将解压缩后的weblogs_parse.txt文件放到Kettle所在主机的本地目录下。

http://wiki.pentaho.com/download/attachments/23530622/weblogs_parse.txt.zip?version=1&modificationDate=1327068013000

        而后执行下面的HiveQL创建一个Hive表,表结构与weblogs_parse.txt文件的结构相匹配。

create table test.weblogs (
client_ip         string,
full_request_date string,
day               string,
month             string,
month_num         int,
year              string,
hour              string,
minute            string,
second            string,
timezone          string,
http_verb         string,
uri               string,
http_status_code  string,
bytes_returned    string,
referrer          string,
user_agent        string)
row format delimited fields terminated by '\t';

        建立和前例相同的做业,只是修改如下两个做业项属性:

  • 源文件/目录:file:///root/kettle_hadoop/3/weblogs_parse.txt
  • Destination File/Folder:/user/hive/warehouse/test.db/weblogs

        保存并成功执行做业后,查询test.weblogs表的记录与weblogs_parse.txt文件内容相同。

3. 从HDFS抽取数据到MySQL

        从下面的地址下载文件
http://wiki.pentaho.com/download/attachments/23530622/weblogs_aggregate.txt.zip?version=1&modificationDate=1327067858000

        这是Pentaho提供的一个压缩文件,其中包含一个名为weblogs_aggregate.txt的文本文件,文件中有36616行记录,每行记录有4列,分别表示IP地址、年份、月份、访问页面数,前5行记录以下。咱们使用这个文件做为最初的原始数据。

0.308.86.81    2012    07    1
0.32.48.676    2012    01    3
0.32.85.668    2012    07    8
0.45.305.7    2012    01    1
0.45.305.7    2012    02    1

        用下面的命令把解压缩后的weblogs_aggregate.txt文件上传到HDFS的/user/root目录下。

hdfs dfs -put weblogs_aggregate.txt /user/root/

        在Spoon中新建一个如图3-9的转换。转换中只包含“Hadoop File Input”和“表输出” 两个步骤。

图3-9 从HDFS抽取数据到MySQL的转换

        编辑“Hadoop File Input”步骤属性以下:
(1)“文件”标签

  • Environment:选择“CDH631”。
  • File/Folder:选择“/user/root/weblogs_aggregate.txt”

(2)“内容”标签

  • 文件类型:CVS
  • 分隔符:删除分号,点击“Insert TAB”按钮插入TAB分隔符。
  • 头部:勾掉。
  • 格式:选择“Unix”。
  • 本地日期格式:选择“en_US”

(3)“字段”标签
        输入如表3-4所示。

名称

类型

格式

长度

去除空字符串方式

重复

client_ip

String

 

20

不去掉空格

year

Integer

#

15

不去掉空格

month_num

Integer

#

15

不去掉空格

pageviews

Integer

#

15

不去掉空格

表3-4 weblogs_aggregate.txt对应的字段

        编辑“表输出”步骤属性以下:

  • 数据库链接:选择“mysql_node3”。
  • 目标表:输入“aggregate_hdfs”。
  • 剪裁表:勾选。

        mysql_node3是链接Hadoop时已经建好的一个MySQL数据库链接。“主选项”和“数据库字段”标签下的属性都不须要设置,“表字段”和“流字段”会自动映射。

        下面执行SQL创建mysql的表:

use test;
create table aggregate_hdfs (
    client_ip varchar(15),
    year smallint,
    month_num tinyint,
    pageviews bigint
);

        保存并执行转换,而后查询aggregate_hdfs表,结果以下:

mysql> select count(*) from test.aggregate_hdfs;
+----------+
| count(*) |
+----------+
|    36616 |
+----------+
1 row in set (0.03 sec)

mysql> select * from test.aggregate_hdfs limit 5;
+-------------+------+-----------+-----------+
| client_ip   | year | month_num | pageviews |
+-------------+------+-----------+-----------+
| 0.308.86.81 | 2012 |         7 |         1 |
| 0.32.48.676 | 2012 |         1 |         3 |
| 0.32.85.668 | 2012 |         7 |         8 |
| 0.45.305.7  | 2012 |         1 |         1 |
| 0.45.305.7  | 2012 |         2 |         1 |
+-------------+------+-----------+-----------+
5 rows in set (0.00 sec)

4. 从Hive抽取数据到MySQL

        在Spoon中新建一个如图3-10的转换。转换中只包含“表输入”和“表输出” 两个步骤。

图3-10 从Hive抽取数据到MySQL的转换

        编辑“表输入”步骤属性以下:

  • 数据库链接:选择“hive_cdh631”。
  • SQL:输入下面的SQL语句:
    select client_ip, year, month, month_num, count(*) as pageviews
      from test.weblogs
     group by client_ip, year, month, month_num

     

        hive_cdh631是链接Hadoop时已经建好的一个Hive数据库链接。

        编辑“表输出”步骤属性以下:

  • 数据库链接:选择“mysql_node3”。
  • 目标表:输入“aggregate_hive”。
  • 剪裁表:勾选。

        下面执行SQL创建mysql的表:

use test;
create table aggregate_hive (
    client_ip varchar(15),
    year varchar(4),
    month varchar(10),
    month_num tinyint,
    pageviews bigint
);

        保存并执行转换,而后查询aggregate_hive表,结果以下:

mysql> select count(*) from test.aggregate_hive;
+----------+
| count(*) |
+----------+
|    36616 |
+----------+
1 row in set (0.03 sec)

mysql> select * from test.aggregate_hive limit 5;
+---------------+------+-------+-----------+-----------+
| client_ip     | year | month | month_num | pageviews |
+---------------+------+-------+-----------+-----------+
| 0.45.305.7    | 2012 | Feb   |         2 |         1 |
| 0.48.322.75   | 2012 | Jul   |         7 |         1 |
| 0.638.50.46   | 2011 | Dec   |        12 |         8 |
| 01.660.68.623 | 2012 | Jun   |         6 |         1 |
| 01.660.70.74  | 2012 | Jul   |         7 |         1 |
+---------------+------+-------+-----------+-----------+
5 rows in set (0.00 sec)

4、执行HiveQL语句

        在这个示例中演示如何用Kettle执行Hive的HiveQL语句。咱们在“向Hive导入数据”一节创建的weblogs表上执行聚合查询,同时创建一个新表保存查询结果。新建一个Kettle做业,只有“START”和“SQL”两个做业项,如图3-11所示。
 

图3-11 执行Hive HiveQL语句的做业

        编辑“SQL”做业项属性以下:

  • 数据库链接:选择“hive_cdh631”。
  • SQL脚本:
    create table test.weblogs_agg
    as
    select client_ip, year, month, month_num, count(*)
      from test.weblogs
     group by client_ip, year, month, month_num;

     

        保存并成功执行做业后检查hive表,结果以下:

hive> select count(*) from test.weblogs_agg;
...
36616

        能够看到weblogs_agg表中已经保存了所有的聚合数据。

5、执行MapReduce

1. 生成聚合数据集

        “执行HiveQL语句”示例只用一句HiveQL就生成了聚合数据,本示例使用“Pentaho MapReduce”做业项完成类似的功能,把细节数据汇总成聚合数据集。当给一个关系型数据仓库或数据集市准备待抽取的数据时,这是一个常见的使用场景。咱们把weblogs_parse.txt文件做为细节数据,目标是生成聚合数据文件,其中包含按IP和年月分组统计的PV数。

(1)准备文件与目录

# 建立格式化文件所在目录
hdfs dfs -mkdir /user/root/parse/
# 上传格式化文件
hdfs dfs -put -f weblogs_parse.txt /user/root/parse/
# 修改读写权限
hdfs dfs -chmod -R 777 /user/root/

(2)创建一个用于Mapper的转换

图3-12 生成聚合数据Mapper转换

        如图3-12所示的转换由“MapReduce Input”、“拆分字段”、“利用Janino计算Java表达式”、“MapReduce Output”四个步骤组成。

        编辑“MapReduce Input”步骤以下:

  • Key field:“Type”选择“String”,定义 Hadoop MapReduce 键的数据类型。
  • Value field:“Type”选择“String”,定义 Hadoop MapReduce 值的数据类型。

        该步骤输出两个字段,名称是固定的key和value,也就是Map阶段输入的键值对。

        编辑“拆分字段”步骤以下:

  • 须要拆分的字段:选择“value”。
  • 分隔符:输入“$[09]”,以TAB做为分隔符。
  • 字段:新的字段名以下,类型均为String。
    client_ip
    full_request_date
    day
    month
    month_num
    year
    hour
    minute
    second
    timezone
    http_verb
    uri
    http_status_code
    bytes_returned
    referrer
    user_agent

     

        该步骤将输入的value字段拆分红16个字段,输出17个字段(key字段没变,文本文件每行的key是文件起始位置到每行的字节偏移量)。

        编辑“利用Janino计算Java表达式”步骤如表3-5所示。

New field

Java expression

Value type

new_key

client_ip + '\t' + year + '\t' + month_num

String

new_value

1

Integer

表3-5 聚合数据转换中的“利用Janino计算Java表达式”步骤

        该步骤为数据流中增长两个新的字段,名称分别定义为new_key和new_value。new_key字段的值定义为client_ip + '\t' + year + '\t' + month_num,将IP地址、年份、月份和字段间的两个TAB符拼接成一个字符串。new_value字段的值为1,数据类型是整数。该步骤输出19个字段。

        编辑“MapReduce Output”步骤以下:

  • Key field:选择“new_key”。
  • Value field:选择“new_value”。

该步骤输出“new_key”和“new_value”两个字段,即Map阶段输出的键值对。

将转换保存为aggregate_mapper.ktr。

(3)创建一个用于Reducer的转换

图3-13 生成聚合数据Reducer转换

如图3-13所示的转换由“MapReduce Input”、“分组”、“MapReduce Output”三个步骤组成。

编辑“MapReduce Input”步骤以下:
. Key field:“Type”选择“String”。
. Value field:“Type”选择“Integer”。

        该步骤输出两个字段,名称是固定的key和value,key对应Mapper转换的new_key输出字段,value对应Mapper转换的new_value输出字段。

        编辑“分组”步骤以下:

  • 构成分组的字段:选择“key”。
  • 聚合:名称、Subject、类型三列的值分别是new_value、value、求和。

        该步骤按key字段分组(key字段的值就是client_ip + '\t' + year + '\t' + month_num),对每一个分组的value求和,每组的合计值定义为一个新的字段new_value。注意,此处的new_value和Mapper转换输出的new_value字段含义是不一样的。Mapper转换输出的new_value字段对应这里的Subject字段值。

        编辑“MapReduce Output”步骤以下:

  • Key field:选择“key”。
  • Value field:选择“new_value”。

        输出Reducer处理后的键值对,这就是咱们想要的结果。

        将转换保存为aggregate_reducer.ktr。

(4)创建一个调用MapReduce步骤的做业

图3-14 聚合数据Pentaho MapReduce做业

        如图3-14所示的做业使用mapper和reducer转换。须要编辑Pentaho MapReduce做业项的Mapper、Reducer、job Setup、Cluster四个标签页,每一个标签页上的选项及定义。

        Mapper标签:

  • Transformation:选择第(1)步创建的Mapper转换,这里为“/root/kettle_hadoop/3/aggregate_mapper.ktr”。
  • Input step name:输入“MapReduce Input”。这是接收mapping数据的步骤名,必须是一个MapReduce Input步骤的名称。
  • Output step name:输入“MapReduce Output”。这是mapping输出步骤名,必须是一个MapReduce Output步骤的名称。

        Reducer标签:

  • Transformation:选择第(2)步创建的Reducer转换,这里为“/root/kettle_hadoop/3/aggregate_mapper.ktr”。
  • Input step name:输入“MapReduce Input”。这是接收reducing数据的步骤名,必须是一个MapReduce Input步骤的名称。
  • Output step name:输入“MapReduce Output”。这是reducing输出步骤名,必须是一个MapReduce Output步骤的名称。

        Job Setup标签:

  • Input path:输入“/user/root/parse/”。一个以逗号分隔的HDFS目录列表,目录中存储的是MapReduce要处理的源数据文件。
  • Output path:输入“/user/root/aggregate_mr”。存储MapReduce做业输出数据的HDFS目录。
  • Remove output path before job:勾选。执行做业时先删除输出目录。
  • Input format:输入“org.apache.hadoop.mapred.TextInputFormat”,为输入格式的类名。
  • Output format:输入“org.apache.hadoop.mapred.TextOutputFormat”,为输出格式的类名。

        Cluster标签:

  • Hadoop job name:输入“aggregate”。
  • Hadoop cluster:选择“CDH631”,为一个已经定义的Hadoop集群。
  • Number of mapper tasks:1。分配的mapper任务数,由输入的数据量所决定。典型的值在10-100之间。非CPU密集型的任务能够指定更高的值。
  • Number of reduce tasks:1。分配的reducer任务数。通常来讲,该值设置的越小,reduce操做启动的越快,设置的越大,reduce操做完成的更快。加大该值会增长Hadoop框架的开销,但可以使负载更加均衡。若是设置为0,则不执行reduce操做,mapper的输出将做为整个MapReduce做业的输出。
  • Logging interval:60。日志消息间隔的秒数。
  • Enable blocking:勾选。若是选中,做业将等待每个做业项完成后再继续下一个做业项,这是Kettle感知Hadoop做业状态的惟一方式。若是不选,MapReduce做业会本身执行,而Kettle在提交MapReduce做业后当即会执行下一个做业项。除非选中该项,不然Kettle的错误处理在这里将没法工做。

        将做业保存为aggregate_mr.kjb。

(5)执行做业并验证输出

[hdfs@node3~]$hdfs dfs -ls /user/root/aggregate_mr/
Found 2 items
-rw-r--r--   3 root supergroup          0 2020-08-31 13:46 /user/root/aggregate_mr/_SUCCESS
-rw-r--r--   3 root supergroup     890709 2020-08-31 13:46 /user/root/aggregate_mr/part-00000
[hdfs@node3~]$hdfs dfs -cat /user/root/aggregate_mr/part-00000 | head -10
0.308.86.81    2012    07    1
0.32.48.676    2012    01    3
0.32.85.668    2012    07    8
0.45.305.7    2012    01    1
0.45.305.7    2012    02    1
0.46.386.626    2011    11    1
0.48.322.75    2012    07    1
0.638.50.46    2011    12    8
0.87.36.333    2012    08    7
01.660.68.623    2012    06    1
cat: Unable to write to output stream.
[hdfs@node3~]$

        能够看到,/user/root/aggregate_mr/目录下生成了名为part-00000输出文件,文件中包含按IP和年月分组的PV数。

2. 格式化原始web日志

        本示例说明如何使用Pentaho MapReduce把原始web日志解析成格式化的记录。

(1)准备文件与目录

# 建立原始文件所在目录
hdfs dfs -mkdir /user/root/raw
# 修改读写权限
hdfs dfs -chmod -R 777 /user/root/

        而后用Hadoop copy files做业项将weblogs_rebuild.txt文件放到HDFS的/user/root/raw目录下,具体操做参见前面“向HDFS导入数据”。

(2)创建一个用于Mapper的转换

图3-15 格式化文件Mapper转换

        编辑“MapReduce Input”步骤以下:

  • Key field:“Type”选择“String”。
  • Value field:“Type”选择“String”。

        编辑“正则表达式”步骤以下:

  • 要匹配的字段:输入“value”。
  • Result field name:输入“is_match”
  • 为每一个捕获组(capture group)建立一个字段:勾选。
  • Replace previous fields:勾选。
  • 正则表达式:
    ^([^\s]{7,15})\s            # client_ip
    -\s                         # unused IDENT field
    -\s                         # unused USER field
    \[((\d{2})/(\w{3})/(\d{4})  # request date dd/MMM/yyyy
    :(\d{2}):(\d{2}):(\d{2})\s([-+ ]\d{4}))\]
                                # request time :HH:mm:ss -0800
    \s"(GET|POST)\s             # HTTP verb
    ([^\s]*)                     # HTTP URI
    \sHTTP/1\.[01]"\s           # HTTP version
     
    (\d{3})\s                   # HTTP status code
    (\d+)\s                     # bytes returned
    "([^"]+)"\s                 # referrer field
     
    "                           # User agent parsing, always quoted.
    "?                          # Sometimes if the user spoofs the user_agent, they incorrectly quote it.
    (                           # The UA string
      [^"]*?                    # Uninteresting bits
      (?:
        (?:
         rv:                    # Beginning of the gecko engine version token
         (?=[^;)]{3,15}[;)])    # ensure version string size
         (                      # Whole gecko version
           (\d{1,2})                   # version_component_major
           \.(\d{1,2}[^.;)]{0,8})      # version_component_minor
           (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_a
           (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_b
         )
         [^"]*                  # More uninteresting bits
        )
       |
        [^"]*                   # More uninteresting bits
      )
    )                           # End of UA string
    "?
    "

     

  •  捕获组(Capture Group)字段:以下所示,全部字段都是String类型。
    client_ip
    full_request_date
    day
    month
    year
    hour
    minute
    second
    timezone
    http_verb
    uri
    http_status_code
    bytes_returned
    referrer
    user_agent
    firefox_gecko_version
    firefox_gecko_version_major
    firefox_gecko_version_minor
    firefox_gecko_version_a
    firefox_gecko_version_b

     

        编辑“过滤记录”步骤以下:

  • 发送true数据给步骤:选择“值映射”。
  • 发送false数据给步骤:选择“空操做(什么也不作)”
  • 条件:选择“is_match = Y”

        编辑“值映射”步骤以下:

  • 使用的字段名:选择“month”。
  • 目标字段名(空=覆盖):输入“month_num”。
  • 不匹配时的默认值:输入“00”。
  • 字段值:源值与目标值输入以下。
    Jan 01
    Feb 02
    Mar 03
    Apr 04
    May 05
    Jun 06
    Jul 07
    Aug 08
    Sep 09
    Oct 10
    Nov 11
    Dec 12

     

        编辑“利用Janino计算Java表达式”步骤以下:

  • New field:输入“output_value”。
  • Java expression:输入以下。
    client_ip + '\t' + full_request_date + '\t' + day + '\t' + month + '\t' + month_num + '\t' + year + '\t' + hour + '\t' + minute + '\t' + second + '\t' + timezone + '\t' + http_verb + '\t' + uri + '\t' + http_status_code + '\t' + bytes_returned + '\t' + referrer + '\t' + user_agent

     

  • Value type:选择“String”。

        编辑“MapReduce Output”步骤以下:

  • Key field:选择“key”。
  • Value field:选择“output_value”。

        将转换保存为weblog_parse_mapper.ktr。

(3)创建一个调用MapReduce步骤的做业

图3-16 格式化文件Pentaho MapReduce做业

        编辑“Pentaho MapReduce”做业项以下。
        Mapper标签:

  • Transformation:选择上一步创建的转换,这里为“/root/kettle_hadoop/3/weblogs_parse_mapper.ktr”。
  • Input step name:输入“MapReduce Input”。
  • Output step name:输入“MapReduce Output”。

        Job Setup标签:

  • Input path:输入“/user/root/raw”。
  • Output path:输入“/user/root/parse1”。
  • Remove output path before job:勾选。
  • Input format:输入“org.apache.hadoop.mapred.TextInputFormat”。
  • Output format:输入“org.apache.hadoop.mapred.TextOutputFormat”。

        Cluster标签:

  • Hadoop job name:输入“Web Log Parse”。
  • Hadoop cluster:选择“CDH631”。
  • Number of mapper tasks:2
  • Number of reduce tasks:0
  • Logging interval:60
  • Enable blocking:勾选。

        将做业保存为weblogs_parse_mr.kjb。

(4)执行做业并验证输出

        做业成功执行后检查HDFS的输出文件,结果以下。

[hdfs@node3~]$hdfs dfs -ls /user/root/parse1
Found 3 items
-rw-r--r--   3 root supergroup          0 2020-08-31 10:59 /user/root/parse1/_SUCCESS
-rw-r--r--   3 root supergroup   42601640 2020-08-31 10:59 /user/root/parse1/part-00000
-rw-r--r--   3 root supergroup   42810160 2020-08-31 10:59 /user/root/parse1/part-00001
[hdfs@node3~]$hdfs dfs -get /user/root/parse1/part-00000
[hdfs@node3~]$head -5 part-00000 
0    323.81.303.680    25/Oct/2011:01:41:00 -0500    25    Oct    10    2011    01    41    00    -0500    GET    /download/download6.zip    200    0    -    Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.19) Gecko/2010031422 Firefox/3.0.19
193    668.667.44.3    25/Oct/2011:07:38:30 -0500    25    Oct    10    2011    07    38    30    -0500    GET    /download/download3.zip    200    0    -    Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.12) Gecko/20070719 CentOS/1.5.0.12-3.el5.centos Firefox/1.5.0.12
405    13.386.648.380    25/Oct/2011:17:06:00 -0500    25    Oct    10    2011    17    06    00    -0500    GET    /download/download6.zip    200    0    -    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB6.3; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; InfoPath.2)
651    06.670.03.40    26/Oct/2011:13:24:00 -0500    26    Oct    10    2011    13    24    00    -0500    GET    /product/demos/product2    200    0    -    Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3
838    18.656.618.46    26/Oct/2011:17:15:30 -0500    26    Oct    10    2011    17    15    30    -0500    GET    /download/download4.zip    200    0    -    Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_3; en-us) AppleWebKit/531.22.7 (KHTML, like Gecko) Version/4.0.5 Safari/531.22.7
[hdfs@node3~]$

        能够看到,/user/root/parse1目录下生成了名为part-00000和part-00001的两个输出文件(由于使用了两个mapper),内容已经被格式化。

6、提交Spark做业

        Kettle不但支持MapReduce做业,还能够经过“Spark Submit”做业项,向CDH 5.3以上、HDP 2.3以上、Amazon EMR 3.10以上的Hadoop平台提交Spark做业。在本示例中,咱们先为Kettle配置Spark,而后修改并执行Kettle安装包中自带的Spark PI做业例子,说明如何在Kettle中提交Spark做业。

1. 在Kettle主机上安装Spark客户端

        使用Kettle执行Spark做业,须要在Kettle主机安装Spark客户端。只要将CDH中Spark的库文件复制到Kettle所在主机便可。

-- 在172.16.1.127上执行
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib
scp -r spark 172.16.1.101:/root/

2. 为Kettle配置Spark

        如下操做均在172.16.1.101以root用户执行。

(1)备份原始配置文件

cd /root/spark/conf/
cp spark-defaults.conf spark-defaults.conf.bak
cp spark-env.sh spark-env.sh.bak

(2)编辑spark-defaults.conf文件

vim /root/spark/conf/spark-defaults.conf

        内容以下:

# 使用spark.yarn.archive减小任务启动时间
spark.yarn.archive=hdfs://nameservice1/user/spark/lib/spark_jars.zip
# 解决和yarn相关Jersey包冲突,避免spark on yarn启动spark-submit时出现java.lang.NoClassDefFoundError错误
spark.hadoop.yarn.timeline-service.enabled=false
# 记录Spark事件,用于应用程序在完成后重构WebUI
spark.eventLog.enabled=true
# 记录Spark事件的目录
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
# spark on yarn的history server地址
spark.yarn.historyServer.address=http://node3:18088

(3)编辑spark-env.sh文件

vim /root/spark/conf/spark-env.sh

        内容以下:

#!/usr/bin/env bash

# hadoop配置文件所在目录 
HADOOP_CONF_DIR=/root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61
# spark主目录
SPARK_HOME=/root/spark

(4)编辑core-site.xml文件

vim /root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/core-site.xml

        去掉下面这段的注释:

<property>
  <name>net.topology.script.file.name</name>
  <value>/etc/hadoop/conf.cloudera.yarn/topology.py</value>
</property>

3. 提交Spark做业

(1)修改Kettle自带的Spark例子

cp /root/pdi-ce-8.3.0.0-371/samples/jobs/Spark\ Submit/Spark\ submit.kjb /root/kettle_hadoop/3/spark_submit.kjb

        在Spoon中打开/root/kettle_hadoop/spark_submit.kjb文件,如图3-17所示。

图3-17 Kettle自带的Spark例子

        编辑Spark PI做业项以下:

  • Spark Submit Utility:选择Spark提交程序,本例为“/root/spark/bin/spark-submit”。
  • Master URL:由于yarn运行在CDH集群,而不是Kettle主机上,因此这里选择“yarn-cluster”。
  • Files标签的Application Jar:选择“/root/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.1.jar”。

(2)保存行执行做业

        Spark History Server Web UI如图3-18所示。

图3-18 Spark UI看到提交的Spark做业

7、小结

        本篇以Kettle 8.3和CDH 6.3.1为例,介绍Kettle对Hadoop的支持。经过提交适当的参数,Kettle能够链接Hadoop的HDFS、MapReduce、Zookeeper、Oozie和Spark服务。Kettle的数据库链接类型中支持Hive、Hive 2/3和Impala。可使用Kettle导出导入Hadoop集群中(HDFS、Hive等)的数据,执行Hive的HiveQL语句。Kettle支持在Hadoop中执行基于MapReduce的Kettle转换,还支持向Spark集群提交做业。这里演示的例子都是Pentaho官方提供示例。从下一篇开始,咱们将创建一个模拟的Hadoop数据仓库,并用使用Kettle完成其上的ETL操做。  

相关文章
相关标签/搜索