需求:统计每小时的PV数php
用来描述将数据历来源端通过抽取(extract)、转换(transform)、加载(load)至目的端的过程java
字段过滤node
字段补全mysql
字段格式化linux
将数据导出git
由Facebook开源的,用于解决海量结构化日志的数据统计的项目github
本质: 将HQL转化为MapReduce程序算法
Hive的其实时HDFS上的目录和文件sql
元数据信息被保存在自带的Deybe数据库中shell
只容许建立一个链接
多用于Demo
元数据信息被保存在MySQL数据库
MySQL数据库与Hive运行在同一台物理机器上
多用于开发和测试
元数据信息被保存在MySQL数据库
MySQL数据库与Hive运行在不一样台物理机器上
用于实际生成环境
1) 卸载
$ rpm -qa | grep mysql
$ sudo rpm -e mysql-libs-5.1.71-1.el6.x86_64 --nodeps
2) 安装
可选择将缓存替换,而后再安装 $ sudo cp -r /opt/software/x86_64/ /var/cache/yum/
$ sudo yum install -y mysql-server mysql mysql-devel
3) 启动mysql服务
$ sudo service mysqld start
4) 设置密码
$ /usr/bin/mysqladmin -u root password '新密码'
5) 开机启动
$ sudo chkconfig mysqld on
6) 受权root的权限及设置远程登陆
登陆
$ mysql -u root -p
受权
mysql> grant all privileges on *.* to 'root'@'%' identified by '密码'; mysql> grant all privileges on *.* to 'root'@'linux01' identified by '密码'; -- 必须有这一句,%包括全部
all privileges 全部权限
. 全部数据库的全部表
'root'@'%' 在任意主机以root身份登陆
'root'@'linux03.ibf.com' 在linux03主机以root登陆
by 'root' 使用root做为密码
7)刷新受权
mysql> flush privileges;
8)测试,在windows中是否能够登陆
mysql -h linux03.ibf.com -u root -p
必须先安装HDFS和Yarn
1)安装:
$ tar -zxvf /opt/software/hive-0.13.1-bin.tar.gz -C /opt/modules/
重命名hive文件夹名字
$ cd /opt/modules
$ mv apache-hive-0.13.1-bin/ hive-0.13.1/
2)在HDFS上 建立tmp目录和hive仓库
$ bin/hdfs dfs -mkdir -p /user/hive/warehouse
$ bin/hdfs dfs -mkdir /tmp #已存在
$ bin/hdfs dfs -chmod g+w /user/hive/warehouse
$ bin/hdfs dfs -chmod g+w /tmp
3)修改配置
$ cd hive-0.13.1/
$ cp conf/hive-default.xml.template conf/hive-site.xml
$ cp conf/hive-log4j.properties.template conf/hive-log4j.properties
$ cp conf/hive-env.sh.template conf/hive-env.sh
3-1)修改hive-env.sh
JAVA_HOME=/opt/modules/jdk1.7.0_67 #添加 HADOOP_HOME=/opt/modules/hadoop-2.5.0 export HIVE_CONF_DIR=/opt/modules/hive-0.13.1/conf
3-2)修改hive.site.xml
<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://linux01:3306/metastore?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property>
3-3)修改日志配置hive-log4j.properties
hive.log.dir=/opt/modules/hive-0.13.1/logs
3-4)拷贝jdbc驱动到hive的lib目录
$ cp /opt/software/mysql-connector-java-5.1.34-bin.jar /opt/modules/hive-0.13.1/lib/
4)肯定yarn和hdfs启动
$ jps
6468 ResourceManager
6911 Jps
6300 RunJar
6757 NodeManager
2029 NameNode
2153 DataNode
此时使用bin/hive 能够进入hive
进入hive目录
$ cd /opt/modules/hive-0.13.1/
bin/hive
show databases;
create database mydb;
use mydb;
show tables;
create table student ( id int comment 'id of student', name string comment 'name of student', age int comment 'age of student', gender string comment 'sex of student', addr string ) comment 'this is a demo' row format delimited fields terminated by '\t';
表默认建立在/user/hive/warehouse里
经过hive.metastore.warhouse.dir配置
desc student; 查看表字段
或
desc formatted student; 能够查看元数据
此时mysql的metastore数据库情况
mysql> select * from TBLS;
+--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ | TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ | 1 | 1556132119 | 6 | 0 | chen | 0 | 1 | student | MANAGED_TABLE | NULL | NULL | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ 1 row in set (0.00 sec)
mysql> select * from COLUMNS_V2;
+-------+-----------------+-------------+-----------+-------------+ | CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX | +-------+-----------------+-------------+-----------+-------------+ | 1 | NULL | addr | string | 4 | | 1 | age of student | age | int | 2 | | 1 | sex of student | gender | string | 3 | | 1 | id of student | id | int | 0 | | 1 | name of student | name | string | 1 | +-------+-----------------+-------------+-----------+-------------+ 5 rows in set (0.00 sec)
load data local inpath '/home/hadoop/student.log' into table student;
load data inpath '/input/student.data' into table student;
重启无效
set hive.cli.print.header=true; #列名
set hive.cli.print.current.db=true; #表名
reset; 重置
重启有效
<property> <name>hive.cli.print.header</name> <value>true</value> </property> <property> <name>hive.cli.print.current.db</name> <value>true</value> </property>
!ls
!pwd
dfs -ls /
dfs -mkdir /hive
-e 执行sql
-f 执行sql文件
-S 静默执行
hive -e
$ bin/hive -e "select *from test_db.emp_p"
hive -f
$ bin/hive -S -f /home/hadoop/emp.sql > ~/result.txt
drop table user;
truncate table user;
create table emp( empId int, empString string, job string, salary float, deptId int ) row format delimited fields terminated by '\t';
load data inpath '/input/dept.txt' into table dept;
# 或从本地加载 load data local inpath '/home/hadoop/dept.txt' into table dept;
create external table emp_ex ( empId int, empName string, job string, salary float, deptId int ) row format delimited fields terminated by '\t' location '/hive/table/emp';
把数据移动到表所在位置
hive (mydb)> dfs -mv /input/emp.txt /hive/table/emp/emp.txt
服务器加载
hive (mydb)> load data local inpath '/home/hadoop/emp.data' into table emp;
或者直接使用dfs命令移动数据到hive表目录下
hive (mydb)> dfs -put /home/hadoop/emp.data /hello/table/emp;
外部表建立表的时候,须要用external
外部表在删除表的时候只会删除表的元数据(metadata)信息不会删除表数据(data)
内部表删除时会将元数据信息和表数据同时删除
create table emp_part( empno int, empname string, empjob string, mgrno int, birthday string, salary float, bonus float, deptno int ) partitioned by (province string) row format delimited fields terminated by '\t';
向分区表加载数据
显式指定分区值
load data local inpath '/home/user01/emp.txt' into table emp_part partition (province='CHICAGO');
show partitions emp_part;
alter table emp_part add partition (province='shanghai');
alter table emp_part drop partition (province='shanghai');
向分区添加数据
load data local inpath '本地路径' into table emp_part partition (province='shanghai');
查询分区数据
select * from emp_part where province='henan';
create table emp_second( id int , name string, job string, salary float, dept int ) partitioned by (day string,hour string) row format delimited fields terminated by '\t';
alter table emp_second add partition (day='20180125',hour='16');
alter table emp_second drop partition (day='20180125');
load data local inpath '/home/hadoop/emp.log' into table emp_second partition (day='20180125',hour='17');
链接两个在相同列上划分了桶的表,使用map side join 实现
使sampling更高效
需设置set hive.enforce.bucketing=true
create table bucketed_users(id int, name string) clustered by (id) into 4 buckets
某个数据被分到哪一个桶根据指定列的hash值对桶数取余获得
load data local inpath '本地路径' into table 表名
bin/hdfs dfs -put 本地路径 hdfs路径(hive的表位置)
load data inpath 'hdfs路径' into table 表名
load data inpath 'hdfs路径' overwrite into table 表名
load data local inpath '本地路径' overwrite into table 表名
经过insert语句将select的结果 插入到一张表中
insert into table test_tb select * from emp_p;
建立表时加载数据
create external table test_tb ( id int, name string ) row format delimited fields terminated by '\t'; location "/hive/test_tb";
bin/hive -e "use test_db;select * from emp_p" > /home/hadoop/result.txt
bin/hive -f 路径 >> /home/hadoop/result.txt
insert overwrite local directory '/home/hadoop/data' select * from emp_p;
insert overwrite local directory '/home/hadoop/data' row format delimited fields terminated by '^' select * from emp_p;
hive > insert overwrite directory '/data' select * from emp_p;
hive > export table emp_p to '/input/export' ;
hive > import table emp_imp from 'hdfs_path' ;
通配 *指定字段
select id,name from emp;
select * from emp_p where salary > 10000;
select * from emp_p where sal between 10000 and 15000;
select * from user where email is not null;
select * from emp_p where did in (1,2,3);
count max min sum avg
select count(1) personOfDept from emp_p group by job;
select sum(sal) from emp_p;
select distinct id from emp_part;
select distinct name, province from emp_part;
select eid,ename,salary ,did from emp where emp.did in (select did from dept where dname='人事部');
emp.eid emp.ename emp.salary emp.did 1001 jack 10000.0 1 1002 tom 2000.0 2 1003 lily 20000.0 3 1004 aobama 10000.0 5 1005 yang 10000.0 6
dept.did dept.dname dept.dtel 1 人事部 021-456 2 财务部 021-234 3 技术部 021-345 4 BI部 021-31 5 产品部 021-232
select * from dept, emp;
select * from emp, dept where emp.did=dept.did;
select t1.eid, t1.ename, t1.salary,t2.did ,t2.dname from emp t1 join dept t2 on t1.did=t2.did;
left join
select eid,ename, salary,t2.did, t2.dname from emp t1 left join dept t2 on t1.did = t2.did;
right join
select eid,ename, salary,t2.did, t2.dname from emp t1 right join dept t2 on t1.did = t2.did;
select eid,ename, salary,t2.did, t2.dname from emp t1 full join dept t2 on t1.did = t2.did;
select * from emp_part order by salary;
设置reduce个数为3,也只有一个文件
set mapreduce.job.reduces=3;
底层 时在reduce函数以前完成的
设置reduce个数
set mapreduce.job.reduces=2;
insert overwrite local directory '/home/hadoop/result' select * from emp_part sort by salary; # 默认reduce个数为1, 这种状况下和order by同样
set mapreduce.job.reduces=3;
这里使用部分分区,薪资排序
insert overwrite local directory '/home/hadoop/result' select * from emp_part distribute by deptno sort by salary;
修改hive-site.xml
<property> <name>hive.server2.long.polling.timeout</name> <value>5000</value> </property> <property> <name>hive.server2.thrift.port</name> <value>10000</value> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>bigdata.ibf.com</value> </property>
1)建立用户 CREATE USER 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456'; 2)受权访问(hive的存储元数据的数据库) GRANT ALL ON metastore.* TO 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456'; GRANT ALL ON metastore.* TO 'hadoop'@'%' IDENTIFIED BY '123456'; 3)刷新受权 flush privileges;
启动服务
$ bin/hiveserver2 & 或 $bin/hive --service hiveserver2 &
链接
$ bin/beeline beeline>!connect jdbc:hive2://bigdata.ibf.com:10000 输入mysql的用户名 输入mysql密码
功能:用于HDFS与RDBMS之间数据的导入导出
全部的导入导出都是基于HDFS而言
数据分析流程
数据采集 日志; RDBMS; 使用sqoop,将须要分析的数据采集到HDFS 数据清洗 字段过滤 字段补全 -》将须要分析的字段导入到HDFS 字段格式化 数据分析 将分析后的数据存储在HDFS 将结果数据从HDFS导出到MySQL 数据展现 从RDBMS中读取数据
sqoop支持:HDFS,hive,hbase
sqoop的底层
-》使用sqoop命令,经过不一样的参数,实现不一样的需求 -》sqoop根据不一样的参数,解析后传递给底层的MapReduce模板 -》将封装好的MapReduce打成jar包,提交给yarn执行 -》这个MapReduce只有maptask,没有reducetask
版本
-》sqoop1 -》sqoop2: -》多了server端 -》添加了安全机制
安装部署
下载解压
tar -zxvf /opt/software/sqoop-1.4.5-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/
修改配置文件
$ pwd /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6 $ cp conf/sqoop-env-template.sh conf/sqoop-env.sh
修改sqoop-env.sh
export HADOOP_COMMON_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6 #Set path to where hadoop-*-core.jar is available export HADOOP_MAPRED_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6 #Set the path to where bin/hive is available export HIVE_HOME=/opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6
将MySQL链接驱动放入sqoop的lib目录
$ cp /opt/software/mysql-connector-java-5.1.34-bin.jar /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/lib/
使用测试
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop help
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop list-databases \ --connect jdbc:mysql://linux03.ibf.com:3306 \ --username root \ --password 123456
在sqoop-1.4.6中,须要添加java-json包
$ cp /opt/software/java-json.jar /opt/cdh5.14.2/sqoop-1.4.6-cdh5.14.2/lib/
解决找不到hive仓库的问题
$ cp ${HIVE_HOME}/conf/hive-site.xml ${SQOOP_HOME}/conf/
在HADOOP_CLASSPATH中追加hive的依赖
$ sudo vi /etc/profile #HADOOP_CLASSPATH export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/cdh5.14.2/hive-1.1.0-cdh5.14.2/lib/* source /etc/profile
bin/sqoop import --help #查看命令提示
源:MySQL的一张表
目标:HDFS一个路径
在MySQL中建立测试表
在mysql中添加数据
use test_db; create table user( id int primary key, name varchar(20) not null, salary float )charset=utf8; insert into user values(1,"张三",9000); insert into user values(2,"李四",10000); insert into user values(3,"王五",6000);
把mysql 中test_db.user 导入到HDFS上, 默认在hdfs://linux01:8020/user/hadoop/
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ > --username root \ > --password 123456 \ > --table user
当没有reduce时, 有几个map就有几个输出文件
-》指定hdfs输出目录:--target-dir
-》指定map的个数:-m
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table user \ > --target-dir /toHdfs \ > -m 1
-》修改导出分隔符 --fields-terminated-by
-》--direct 导入更快
-》提早删除输出目录
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ > --fields-terminated-by '\t' \ > -m 1
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ > --username root \ > --password 123456 \ > --table user \ > --columns name,salary \ > --fields-terminated-by '-' \ > --target-dir /sqoop \ > --delete-target-dir \ > --direct \ > -m 1
将SQL语句执行的结果进行导入-e,--query
bin/sqoop import \ --connect jdbc:mysql://bigdata01.com:3306/test \ --username root \ --password 123456 \ -e 'select * from user where salary>9000 and $CONDITIONS' \ --target-dir /toHdfs \ --delete-target-dir \ -m 1
在上面的-e的查询语句中必须包含where $CONDITIONS ,
若是想用where语句 where salary>9000 and $CONDITIONS'
能够设置密码文件(把--password 改成 --password-file)
sqoop会读取整个password-file,包括空格和回车,可使用echo -n
命令生成密码文件,如:echo -n "secret" > password.file
$ echo -n 'root' > /home/hadoop/mysqlpasswd && chmod 400 /home/hadoop/mysqlpasswd bin/sqoop import \ --connect jdbc:mysql://bigdata01.com:3306/test \ --username root \ --password-file file:///home/hadoop/mysqlpasswd \ -e 'select * from toHdfs where $CONDITIONS' \ --target-dir /sqoop \ --delete-target-dir \ -m 1
hive,指定数据库中没有该表, 就会建立该表
bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ -P \ --table user \ --fields-terminated-by '\t' \ --delete-target-dir \ -m 1 \ --hive-import \ --hive-database test_db \ --hive-table user
过程 MapReduce将数据导入到hdfs用户的家目录 从家目录将数据导入到hive表 增量导入 追加:根据某一列上一次导入的最后一个值,来判断追加的数据 时间戳:根据数据记录修改的时间戳来进行导入 --check-column <column> Source column to check for incremental change --incremental <import-type> Define an incremental import of type 'append' or 'lastmodified' --last-value <value> Last imported value in the incremental check column
若是HDFS上没有该文件会建立该文件
bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3
建立sqoop job,自动建立增量 (报错)
Sqoop job相关的命令有两个:
bin/sqoop job
bin/sqoop-job
使用这两个均可以
建立job:--create
删除job:--delete
执行job:--exec
显示job:--show
列出job:--list
bin/sqoop-job \ --create your-sync-job \ -- import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ -P \ --table user \ -m 1 \ --target-dir /hive/incremental \ --incremental append \ --check-column id \ --last-value 1
bin/sqoop-job --show your-sync-job
bin/sqoop job --show your-sync-job
bin/sqoop job --exec your-sync-job
bin/sqoop job --list
bin/sqoop job --delete my-sync-job
将数据从hive(HDFS上的文件与目录),HDFS导出到MySQL
use mydb create table user_export( id int primary key, name varchar(20) not null, salary float );
须要如今数据库中创建表
bin/sqoop export \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ -P \ --table user_export \ --export-dir /hive/incremental \ --input-fields-terminated-by ',' \ -m 1
使用sqoop --options-file
编辑文件sqoopScript
export --connect jdbc:mysql://linux03.ibf.com:3306/test_db --username root -P --table emp -m 1 --export-dir /input/export --fields-terminated-by "\t"
bin/sqoop --options-file ~/sqoopScript
内容
Hive简单案例需求分析及结果的导出
动态分区的介绍及使用
使用脚本动态加载到hive表中
hive函数
1 需求及分析
需求
分析统计天天每小时的PV数和UV数
分析
建立数据源表
建立分区表(天,小时)/ 加载数据
数据清洗
建立hive表
字段过滤
id url guid 字段补全(无) 字段格式化(无)
数据分析
pv:count(url) uv:count(distinct guid)
保存结果
日期(天) 小时 PV UV
导出结果
导出到MySQL
2 具体实现
数据原表
1) 建立原表
create database if not exists hive_db;
user hive_db;
create table tracklogs(
id string,
url string,
referer string,
keyword string,
type string,
guid string,
pageId string,
moduleId string,
linkId string,
attachedInfo string,
sessionId string,
trackerU string,
trackerType string,
ip string,
trackerSrc string,
cookie string,
orderCode string,
trackTime string,
endUserId string,
firstLink string,
sessionViewNo string,
productId string,
curMerchantId string,
provinceId string,
cityId string,
fee string,
edmActivity string,
edmEmail string,
edmJobId string,
ieVersion string,
platform string,
internalKeyword string,
resultSum string,
currentPage string,
linkPosition string,
buttonPosition string
)
partitioned by (date string,hour string)
row format delimited fields terminated by 't';
2) 加载数据
load data local inpath '/opt/datas/2015082818' into table tracklogs partition(date='20150828',hour='18');
load data local inpath '/opt/datas/2015082819' into table tracklogs partition(date='20150828',hour='19');
分析
1) 创建数据分析表
create table clear (
id string,
url string,
guid string
)
partitioned by (date string, hour string)
row format delimited fields terminated by 't';
2) 过滤数据
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
insert into table clear partition(date='20150828',hour='19') select id,url,guid from tracklogs where date='20150828' and hour='19';
3) 指标分析
pv : select date,hour,count(url) as pv from clear group by date,hour;
uv: select date,hour, count(distinct guid) as uv from clear group by date,hour;
保存结果到result
create table result as select date,hour, count(url) pv, count(distinct guid) as uv from clear group by date,hour;
建立表时没指定分隔符则默认分隔符为 001
导出结果到mysql
# 建立表
create table result(
day varchar(30),
hour varchar(30),
pv varchar(30) not null,
uv varchar(30) not null,
primary key(day,hour)
);
# 导出数据
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop export \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
--password root \
--table result \
--export-dir /user/hive/warehouse/hive_db.db/result \
--input-fields-terminated-by '001' \
-m 1
开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
打开动态分区后,动态分区的模式,有 strict和 nonstrict 两个值可选,strict 要求至少包含一个静态分区列,nonstrict则无此要求。
建立表
create table clear_dynamic (
id string,
url string,
guid string
)
partitioned by (date string, hour string)
row format delimited fields terminated by 't';
动态加载数据
直接加载20180129的全部hour的数据
insert into table clear_dynamic partition(date='20180129',hour) select id,url,guid,hour from tracklogs where date='20180129';
根据hour自动分区
之前是这样写
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
insert into table clear partition(date='20150828',hour='19') select id,url,guid from tracklogs where date='20150828' and hour='19';
20180129/
2018012900
2018012901
2018012902
2018012903
2018012904
2018012905
1) 编写shell_脚本(bin/hive -e "" )
2) 测试脚本
show partitions tracklogs; #查看分区
alter table tracklogs drop partition(date='20150828',hour='18'); 删除分区
alter table tracklogs drop partition(date='20150828',hour='19');
select count(1) from tracklogs; #查看记录数
3) 使用shell脚本使用(bin/hive -f )
4)测试
show partitions tracklogs; #查看分区
alter table tracklogs drop partition(date='20150828',hour='18'); 删除分区
alter table tracklogs drop partition(date='20150828',hour='19');
select count(1) from tracklogs; #查看记录数
用户自定义函数,用于实现hive中不能实现的业务逻辑处理
类型:
UDF: 一进一出
UDAF: 多进一出 sum,count等
UDTF: 一进多出 行列转换
编写UDF:
编写UDF必须继承UDF
必须至少实现一个evaluale方法
必需要有返回类型,能够是null
建议使用hadoop序列化类型
需求:日期转换
31/Aug/2015:00:04:37 +0800 --> 2015-08-31 00:04:37
实现步骤
1) 自定义类实现UDF类
2) 打包不要指定主类
3) 添加到hive中
maven中导入hadoop的包和hive的包
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.2</version> </dependency>
具体实现范例
package com.myudf; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class DateFormate extends UDF { SimpleDateFormat inputDate = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat outDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 31/Aug/2015:00:04:37 +0800 --> 2015-08-31 00:04:37 public Text evaluate(Text str) { if(str == null) { return null; } if(StringUtils.isBlank(str.toString())) { return null; } Date date = null; String val = null; try { date = inputDate.parse(str.toString()); val = outDate.format(date); } catch (ParseException e) { e.printStackTrace(); } return new Text(val); } public static void main(String[] args) { Text val = new DateFormate().evaluate(new Text("31/Aug/2015:00:04:37 +0800")); System.out.println(val); } }
hive (test_db)>add jar /home/hadoop/DDD.jar;
hive (test_db)> CREATE TEMPORARY FUNCTION removequote as 'com.myudf.date.RemoveQuoteUDF';
hive (test_db)> show functions;
bzip2, gzip, lzo, snappy等
压缩比:bzip2>gzip>lzo bzip2
压缩解压速度:lzo>gzip>bzip2 lzo
bin/hadoop checknative -a
http://google.github.io/snappy/
mvn package -Pdist,native,docs -DskipTests -Dtar -Drequire.snappy
关闭hadoop相关进程
解压cdh5.xxx-snappy-lib-native.tar.gz 到$HADOOP_HOME/lib
$ tar -zxvf native-hadoop-cdh5.14.2.tar.gz -C /opt/modules/hadoop-2.6.0-cdh5.14.2/lib
能够观察到已经支持 $ bin/hadoop checknative -a
mapred-site.xml 配置
<property> <name>mapreduce.map.output.compress</name> <value>true</value> </property> <property> <name>mapreduce.map.output.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
运行pi程序: $ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0-cdh5.3.6.jar pi 1 2
经过主机:19888观察该任务的configuration中压缩配置
shuffle阶段启用压缩
set hive.exec.compress.output=true; set mapred.output.compress=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
reduce输出的结果文件进行压缩
set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
create table ( ... ) row format delimited fields terminated by '' STORED AS file_format
TEXTFILE
RCFILE
ORC
PARQUET
AVRO
INPUTFORMAT
| ORC -- (Note: Available in Hive 0.11.0 and later)
| PARQUET --Parquet就是基于Dremel的数据模型和算法实现的。 这个比较常见
写的快
读得快
使用给定的日志文件(18.1MB)
使用不一样的存储格式,存储相同的数据,判断文件大小
在MapReduce的shuffle阶段启用压缩(对中间数据进行压缩能够减小map和reduce task间的数据传输量。对于IO型做业,能够加快速度。)
set hive.exec.compress.intermediate=true; set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
对输出结果压缩
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
建立表file_text ,并加载数据
create table if not exists file_text( t_time string, t_url string, t_uuid string, t_refered_url string, t_ip string, t_user string, t_city string ) row format delimited fields terminated by '\t' stored as textfile; load data local inpath '/home/hadoop/page_views.data' into table file_text;
对比默认格式和file_orc_snappy 数据大小比较
create table if not exists file_orc_snappy( t_time string, t_url string, t_uuid string, t_refered_url string, t_ip string, t_user string, t_city string ) row format delimited fields terminated by '\t' stored as ORC tblproperties("orc.compression"="Snappy"); insert into table file_orc_snappy select * from file_text; -- 不能经过load来加载,由于load本质是hdfs的put,这样不能压缩,必需要insert这样走MapReduce才能让压缩发挥做用
对比默认格式和parquet格式 数据大小比较
create table if not exists file_parquet( t_time string, t_url string, t_uuid string, t_refered_url string, t_ip string, t_user string, t_city string ) row format delimited fields terminated by '\t' stored as parquet; insert into table file_parquet select * from file_text;
对比默认格式和parquet格式,snappy压缩 数据大小比较
create table if not exists file_parquet_snappy( t_time string, t_url string, t_uuid string, t_refered_url string, t_ip string, t_user string, t_city string ) row format delimited fields terminated by '\t' stored as parquet tblproperties("parquet.compression"="Snappy"); insert into table file_parquet_snappy select * from file_text;
hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet_snappy; hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet;
经过正则匹配,加载复杂格式日志文件
1 正则
2 根据日志加载数据
日志
"27.38.5.159" "-" "31/Aug/2015:00:04:53 +0800" "GET /course/view.php?id=27 HTTP/1.1" "200" "7877" - "http://www.ibf.com/user.php?act=mycourse&testsession=1637" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36" "-" "learn.ibf.com"
建立表
CREATE TABLE apachelog ( remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_set string, request_body string, http_referer string, http_user_agent string, http_x_forwarded_for string, host string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\]]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (\"-|[^ ]*\") (\"[^ ]*\")" ) STORED AS TEXTFILE;
load data local inpath '/home/hadoop/moodle.ibf.access.log' into table apachelog;
//Whether to execute jobs in parallel
set hive.exec.parallel=true;
//How many jobs at most can be executed in parallel
set hive.exec.parallel.thread.number=8;#能够调大,提升并行效率
set mapreduce.job.reduces=1
mapreduce.job.jvm.numtasks=1 默认1个
hive配置,默认为true
set hive.mapred.reduce.tasks.speculative.execution=true;
hadoop
mapreduce.map.speculative true
mapreduce.reduce.speculative true
Size of merged files at the end of the job
将小文件合并避免下降hdfs存储大量小文件而下降性能
set hive.merge.size.per.task=256000000;
set hive.mapred.mode=strict; nonstrict默认
严格模式下,
分区表,必须加分区字段过滤条件
对order by, 必须使用limit
限制笛卡尔积的查询(join 的时候不使用on,而使用where)
map join
若是关联查询两张表中有一张小表默认map join,将小表加入内存
hive.mapjoin.smalltable.filesize=25000000 默认大小
hive.auto.convert.join=true 默认开启
若是没有开启使用mapjoin,使用语句制定小表使用mapjoin
select /+ MAPJOIN(time_dim) / count(1) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
reduce join
对两张大表join
对关联的key进行分组
smb join
Sort-Merge-Bucket join
解决大表与大表join速度慢问题
经过分桶字段的的hash值对桶的个数取余进行分桶
set hive.enforce.bucketing=true;
create table 表名 (
字段
)
clustered by(分桶字段) into 分桶数量 buckets;
如
create table student(
id int,
age int,
name string
)
clustered by (id) into 4 bucket
row format delimited fields terminated by ',';
//每一个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否须要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否须要合并)
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256000000
//当输出文件平均大小小于设定值时,启动合并操做。这一设定只有当hive.merge.mapfiles或hive.merge.mapredfiles设定为true时,才会对相应的操做有效。
set hive.merge.smallfiles.avgsize=16000000
本质缘由:key的分布不均致使的
Map 端部分聚合,至关于Combiner
hive.map.aggr=true
有数据倾斜的时候进行负载均衡
hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每一个 Reduce 作部分聚合操做,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不一样的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程能够保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操做。
名词
1) UV: count(distinct guid)
访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。
2) PV:Page View--- count(url)
即页面浏览量或点击量,用户每次刷新即被计算一次。
3) 登陆人数:
登陆网站访问的人数[会员],endUserId有值的数量
4) 游客数:
没有登陆访问的人数,endUserId为空的数量
5) 平均访问时长:
访客平均在网站停留的时间 trackTime --> max - min
6) 二跳率: pv>1的访问量/总访问量
平均浏览2个页面及以上(pv>1)的用户数 / 用户总数(discont guid) 点击1次
二跳率的概念是当网站页面展开后,用户在页面上产生的首次点击被称为“二跳”,二跳的次数即为“二跳量”。二跳量与浏览量的比值称为页面的二跳率。
count(case when pv >=2 then guid else null end ) / discont (guid)
7) 独立IP:---count(distinct ip)
独立IP表示,拥有特定惟一IP地址的计算机访问您的网站的次数,由于这种统计方式比较容易实现,具备较高的真实性,因此成为大多数机构衡量网站流量的重要指标。好比你是ADSL拨号上网的,你拨一次号都自动分配一个ip,这样你进入了本站,那就算一个ip,当你断线了而没清理cookies,以后又拨 了一次号,又自动分配到一个ip,你再进来了本站,那么又统计到一个ip,可是UV(独立访客)没有变,由于2次都是你进入了本站。
日期 | uv | pv | 登陆人数 | 游客人数 | 平均访问时间 | 二跳率 | 独立IP数 |
---|---|---|---|---|---|---|---|
准备测试数据
hive (db_analogs)> create database ts; hive (db_analogs)> use ts; hive (ts)> create table testscore(gender string,satscore int, idnum int) row format delimited fields terminated by '\t'; hive (ts)> load data local inpath '/opt/datas/TESTSCORES.csv' into table testscore;
OVER with standard aggregates: COUNT、SUM、MIN/MAX、 AVG
需求1:
按照性别分组,satscore分数排序(降序),最后一列显示所在分组中的最高分
Female 1000 37070397 1590 Female 970 60714297 1590 Female 910 30834797 1590 Male 1600 39196697 1600 Male 1360 44327297 1600 Male 1340 55983497 1600
答案sql:
hive (ts)> select gender,satscore,idnum,max(satscore) over(partition by gender order by satscore desc) maxs from testscore;
注 意:
partition by 是分组用的
要求 topN
按照性别分组,satscore排序(降序),最后一列显示在分组中的名次
需求1:
分数相同名次不一样,名次后面根据行数增加
Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 3 Female 1490 9589297 4 Female 1390 99108497 5 Female 1380 23048597 6 # 分数相同 Female 1380 81994397 7 # 分数相同
需求2:
分数相同名次相同,名次后面根据行数增加
Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 2 Female 1490 9589297 4 Female 1390 99108497 5 Female 1380 23048597 6 #分数相同 Female 1380 81994397 6 # 分数相同
需求3:
分数相同名次相同,名次连续增加
Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 2 Female 1490 9589297 3 Female 1390 99108497 4 Female 1380 23048597 5 Female 1380 81994397 5
SQL
sql1 hive (ts)> select gender,satscore,idnum,row_number() over(partition by gender order by satscore desc) maxs from testscore; -- ROW_NUMBER() 从1开始,按照顺序,生成分组内记录的序列 sql2 select gender,satscore,idnum,rank() over(partition by gender order by satscore desc) maxs from testscore; -- RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位 sql3 select gender,satscore,idnum,dense_rank() over(partition by gender order by satscore desc) maxs from testscore; -- DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位
# 当有order by,而没有指定窗口子句时,窗口子句默认为RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW(从起点到当前行的范围)
# 当order by和窗口子句都没有时,窗口子句默认ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(从起点到后面的终点)
UNBOUNDED PRECEDING
UNBOUNDED FOLLOWING
1 PRECEDING
1 FOLLOWING
CURRENT ROW
窗口对比
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sums from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND unbounded following) sums from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) sums from testscore;
当前行数据幅度+1后范围内
落后值(上n个值),在不指定落后个数的状况下,默认为落后一个值(数据从上向下显示,落后即当前值以前显示的值)
场景: 分析用户页面浏览顺序
sql
hive (ts)> select gender,satscore,idnum, lag(satscore) over(partition by gender order by satscore desc) as lastvalue from testscore;
要求
gender satscore idnum lastvalue Female 1590 23573597 NULL # 此处为null,能够为其指定默认值 Female 1520 40177297 1590 # 显示当前satscore的上一条记录的值 Female 1520 73461797 1520 # 显示当前satscore的上一条记录的值 Female 1490 9589297 1520 Female 1390 99108497 1490
与LAG相反(下n搁置),用法同理,前面的值(领先值),默认为领先一个值(数据从上向下显示,领先即当前值以后显示的值)
sql
hive (ts)> select gender,satscore,idnum, lead(satscore, 1, 0) over(partition by gender order by satscore desc) as nextvalue from testscore;
结果
gender satscore idnum nextvalue ... Female 1060 59149297 1060 Female 1060 46028397 1000 Female 1000 37070397 970 Female 970 60714297 910 Female 910 30834797 0