现有城市信息和产品信息两张表在MySQL中,另外有用户点击产品日志以文本形式存在hdfs上,现要求统计每一个个城市区域下点击量前三的产品名,具体信息见下方。mysql
mysql> show tables;
+---------------------------------+
| Tables_in_d7 |
+---------------------------------+
| city_info |
| product_info |
| result_product_area_clicks_top3 |
+---------------------------------+
3 rows in set (0.00 sec)
mysql> desc city_info;
+-----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| city_id | int(11) | YES | | NULL | |
| city_name | varchar(255) | YES | | NULL | |
| area | varchar(255) | YES | | NULL | |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)
mysql> select * from city_info;
+---------+-----------+------+
| city_id | city_name | area |
+---------+-----------+------+
| 1 | BEIJING | NC |
| 2 | SHANGHAI | EC |
| 3 | NANJING | EC |
| 4 | GUANGZHOU | SC |
| 5 | SANYA | SC |
| 6 | WUHAN | CC |
| 7 | CHANGSHA | CC |
| 8 | XIAN | NW |
| 9 | CHENGDU | SW |
| 10 | HAERBIN | NE |
+---------+-----------+------+
10 rows in set (0.00 sec)
mysql> desc product_info;
+--------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+--------------+------+-----+---------+-------+
| product_id | int(11) | YES | | NULL | |
| product_name | varchar(255) | YES | | NULL | |
| extend_info | varchar(255) | YES | | NULL | |
+--------------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)
mysql> select * from product_info limit 10; <-- product_info总数100
+------------+--------------+----------------------+
| product_id | product_name | extend_info |
+------------+--------------+----------------------+
| 1 | product1 | {"product_status":1} |
| 2 | product2 | {"product_status":1} |
| 3 | product3 | {"product_status":1} |
| 4 | product4 | {"product_status":1} |
| 5 | product5 | {"product_status":1} |
| 6 | product6 | {"product_status":1} |
| 7 | product7 | {"product_status":1} |
| 8 | product8 | {"product_status":1} |
| 9 | product9 | {"product_status":0} |
| 10 | product10 | {"product_status":1} |
+------------+--------------+----------------------+
10 rows in set (0.00 sec)
[hadoop@hadoop001 data]$ more user_click.txt
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:01:56,1(city_id),72(product_id)
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:52:26,1,68
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:17:03,1,40
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:32:07,1,21
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:26:06,1,63
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:03:11,1,60
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:43:43,1,30
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:09:58,1,96
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:18:45,1,71
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:42:39,1,8
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:24:30,1,6
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:29:49,1,26
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:24:12,1,83
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:07:50,1,62
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:19:31,1,61
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:40:51,1,46
....
[hadoop@hadoop001 data]$ wc -l user_click.txt
11448 user_click.txt
复制代码
1)city_info表和product_info表经过sqoop放到Hive里面
2)经过user_click关联Hive里面的city_info和product_info
3)再使用窗口函数求分组内的TOPN将结果sqoop导入MySQL
4)shell脚本封装这个业务线的全部代码的思路,须要说起的一点,由于city_info/product_info数据变更少,因此经过其余的脚本导入,这个shell脚本不涉及,但我下面步骤依然会写出来。
5)使用crontab触发,天天凌晨2点开始执行
注意点:
a) 每次建立的临时表,在执行以前必定要先删除,要使用if not exits
b) 关键的执行要有日志输出
c) shell脚本如何解决幂等性问题
sql
在sqoop部署篇讲到过怎么部署和使用sqoop,这里不在说明,直接上代码。shell
# 这里给出hive里的city_info的表结构
hive (d7)> create table city_info(
city_id int,
city_name string,
area string
)
row format delimited fields terminated by '\t';
# 导入city_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table city_info \
--split-by 'city_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/city_info' \
--delete-target-dir \
-m 2
# 这里给出hive里的product_info的表结构
hive (d7)> create table product_info(
product_id int,
product_name string,
extend_info string
)
row format delimited fields terminated by '\t';
# 导入product_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table product_info \
--split-by 'product_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/product_info' \
--delete-target-dir \
-m 2
复制代码
ps:若是你第一次用sqoop的话,这里确定会有两个坑。这里暂且不说,下篇文章解答。vim
生产上hive的user_click表确定是个一直数据增加的表,因此该表确定是个分区表。可是通常来讲清洗好的前一天数据会直接放在user_click表存放hdfs上路径上,好比分区表存放路径为hdfs://hadoop001:9000/user/hive/warehouse/d7.db/user_click,那么生产上会将2016-05-05日志清洗好并在该路径上建立分区路径。这时候你查询分区表不会出现该分区数据,该怎么高效的将数据刷新到分区表呢?请看下方代码bash
# 先给出user_click表结构
hive (d7)> create table user_click(
user_id int,
session_id string,
action_time string,
city_id int,
product_id int
)
partitioned by(day string)
row format delimited fields terminated by ',';
# 刷新分区表,另外一种刷新方式不推荐,过于暴力
hive (d7)> alter table user_click add if not exists partition(day='2016-05-05');
复制代码
临时表有区域名,产品名,点击量三个字段。session
hive (d7)> drop table if exists tmp_product_area_clicks;
hive (d7)> create table tmp_product_area_clicks as
> select b.area,c.product_name,count(1) as click_count from user_click a
> left join city_info b on a.city_id=b.city_id
> left join product_info c on a.product_id=c.product_id
> where a.day='2016-05-05'
> group by b.area,c.product_name
复制代码
使用row_number()函数函数
hive (d7)> drop table if exists result_product_area_clicks_top3;
hive (d7)> create table result_product_area_clicks_top3
> row format delimited fields terminated by '\t' as
> select * from (
> select
> "2016-05-05" day,product_id,product_name,area,click_count, <-- 日期会在脚本中更改
> row_number() over(partition by area order by click_count desc) rank
> from tmp_product_area_clicks
> ) t where t.rank<=3;
复制代码
# 咱们事先在MySQL建立好结果表,下面为表结构
create table result_product_area_clicks_top3(
day varchar(15),
product_id int(11),
product_name varchar(50),
area varchar(10),
click_count int(11),
rank int(10)
)
# 为了幂等性,会将MySQL结果表该日期的数据先删掉
# 日期会在脚本中更改
mysql> delete from result_product_area_clicks_top3 where day='2016-05-05';
[hadoop@hadoop001 ~]$ sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
复制代码
hive离线是一天一次,是今天某个时间去运行昨天的数据,因此要在shell脚本中获取前一天,该命令为'date --date '1 day ago' +%Y-%m-%d'。下面就是shell脚本代码。oop
[hadoop@hadoop001 ~]$ vim top3.sh
#!/bin/bash
CURRENT=`date +%Y-%m-%d_%H:%M:%S`
USE_DAY=`date --date '1 day ago' +%Y-%m-%d`
echo '当前使用的日期为:'$USE_DAY''
echo ''$CURRENT',开始刷新分区'
HIVE_PARTITION_SQL="alter table d7.user_click add if not exists partition(day='${USE_DAY}');"
hive -e "${HIVE_PARTITION_SQL}"
echo ''$CURRENT',开始建立临时表,其中数据为每一个区域下每一个产品的点击数'
HIVE_TMP_SQL="drop table if exists tmp_product_area_clicks; create table tmp_product_area_clicks as select b.area,c.product_name,count(1) as click_count from user_click a left join city_info b on a.city_id=b.city_id left join product_info c on a.product_id=c.product_id where a.day='${USE_DAY}' group by b.area,c.product_name;"
hive -e "${HIVE_TMP_SQL}"
echo ''$CURRENT',开始建立结果表,其中数据为每一个区域下每一个产品的前三点击数'
HIVE_RESULT_SQL="drop table if exists result_product_area_clicks_top3; create table result_product_area_clicks_top3 row format delimited fields terminated by '\t' as select * from ( select '${USE_DAY}' day,product_id,product_name,area,click_count, row_number() over(partition by area order by click_count desc) rank from tmp_product_area_clicks ) t where t.rank<=3;"
hive -e "${HIVE_RESULT_SQL}"
echo ''$CURRENT',保持幂等性,开始删除MySQL结果表中当前'$USE_DAY'数据'
MySQL_DETELE_SQL="delete from result_product_area_clicks_top3 where day='${USE_DAY}';"
sudo mysql -uroot -proot -e "${MySQL_DETELE_SQL}"
echo ''$CURRENT',开始将Hive结果表导入MySQL'
sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
echo ''$CURRENT',整个流程结束,请查看MySQL中数据是否导入'
复制代码
使用crontab来作定时,具体见下方代码post
[hadoop@hadoop001 ~]$ crontab -e
* 2 * * * nohup /home/hadoop/top3.sh >> /tmp/top3_logs.log 2>&1 &
复制代码