hive中shuffle的优化java
压缩
压缩可使磁盘上存储的数据量变小,经过下降I/O来提升查询速度。mysql
对hive产生的一系列MR中间过程启用压缩linux
set hive.exec.compress.intermediate=true; set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
对最终输出结果压缩(写到hdfs、本地磁盘的文件)git
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
join优化正则表达式
若是关联查询两张表中有一张小表默认map join,将小表加入内存 hive.mapjoin.smalltable.filesize=25000000 默认大小 hive.auto.convert.join=true 默认开启 若是没有开启使用mapjoin,使用语句制定小表使用mapjoin ```sql select /*+ MAPJOIN(time_dim) */ count(1) from store_sales join time_dim on (ss_sold_time_sk = t_time_sk) ``` 2. smb join Sort-Merge-Bucket join 解决大表与大表join速度慢问题 经过分桶字段的的hash值对桶的个数取余进行分桶 3. 倾斜链接 ```xml <!-- hive.optimize.skewjoin:是否为链接表中的倾斜键建立单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其余键值生成各自的查询计 划。 --> <property> <name>hive.optimize.skewjoin</name> <value>true</value> </property> <property> <!-- hive.skewjoin.key:决定如何肯定链接中的倾斜键。在链接操做中,若是同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜链接键。 --> <name>hive.skewjoin.key</name> <value>100000</value> </property> <!-- hive.skewjoin.mapjoin.map.tasks:指定倾斜链接中,用于Map链接做业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一块儿使用,执行细粒度的控制。 --> <property> <name>hive.skewjoin.mapjoin.map.tasks</name> <value>10000</value> </property> <!-- hive.skewjoin.mapjoin.min.split:经过指定最小split的大小,肯定Map链接做业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一块儿使用,执行细粒度的控制。 --> <property> <name>hive.skewjoin.mapjoin.min.split</name> <value>33554432</value> </property> ```
本质缘由:key的分布不均致使的sql
Map 端部分聚合,至关于Combinershell
hive.map.aggr=true
有数据倾斜的时候进行负载均衡数据库
hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每一个 Reduce 作部分聚合操做,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不一样的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程能够保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操做。express
全量导入apache
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ > --username root \ > --password 123456 \ > --table user
增量导入
bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3
hive致使数据倾斜的可能性(哪些操做会致使) -->分桶 join key分布不均匀 大量空值致使如何解决?
根据key操做到时结果分布不均均可能致使数据倾斜,如group by key
order by 使用全局排序最终只会在一个reducer上运行全部数据,致使数据倾斜
大量NULL
hive的NULL有时候是必须的:
1)hive中insert语句必须列数匹配,不支持不写入,没有值的列必须使用null占位。
2)hive表的数据文件中按分隔符区分各个列。空列会保存NULL(n)来保留列位置。但外部表加载某些数据时若是列不够,如表13列,文件数据只有2列,则在表查询时表中的末尾剩余列无数据对应,自动显示为NULL。
因此,NULL转化为空字符串,能够节省磁盘空间,实现方法有几种
1)建表时直接指定(两种方式)
a、用语句 ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ with serdeproperties('serialization.null.format' = '') 实现,注意二者必须一块儿使用,如
CREATE TABLE hive_tb (id int,name STRING) PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint) ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ WITH SERDEPROPERTIES ( ‘field.delim’='/t’, ‘escape.delim’='//’, ‘serialization.null.format'='' ) STORED AS TEXTFILE;
b、或者经过ROW FORMAT DELIMITED NULL DEFINED AS '' 如
CREATE TABLE hive_tb (id int,name STRING) PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint) ROW FORMAT DELIMITED NULL DEFINED AS '' STORED AS TEXTFILE;
2)修改已存在的表
alter table hive_tb set serdeproperties('serialization.null.format' = '');
hive中如何增长一列数据?
新增一列
hive > alter table log_messages add coloumns( app_name string comment 'Application name', session_id long comment 'The current session id' ); -- 增长列的表的最后一个字段以后,在分区字段以前添加。
若是在表中新增一列new_column,则在原表上直插入new_column这一列数据不可行
若是新增一列是分区,则能够新增数据到该分区下
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
有没有hive处理过json?有哪些函数?
建表时制定jar包处理json数据
ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar; 2. 建表 ``` hive (default)> ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar; Added [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] to class path Added resources: [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] hive (default)> create table spark_people_json( > > `name` string, > > `age` int) > > ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' > > STORED AS TEXTFILE; OK Time taken: 4.445 seconds ``` 2. 记录下若是只是某个字段为json,想要获取里面的某个值怎么操做? 1. get_json_object() 只能获取一个字段 ```sql select get_json_object('{"shop":{"book":[{"price":43.3,"type":"art"},{"price":30,"type":"technology"}],"clothes":{"price":19.951,"type":"shirt"}},"name":"jane","age":"23"}', '$.shop.book[0].type'); ``` 2. json_tuple() 能够获取多个字段 ```sql select json_tuple('{"name":"jack","server":"www.qq.com"}','server','name') ``` 3. 自行编写UDF
sparkstreaming正在运行的程序如何去停止?怎么安全中止?代码作了更新,如何让正在运行的和更新后的代码作一个交替?
升级应用程序代码
若是须要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机制。
使用直接链接方式
消息语义有几种?
sparkstreaming和kafka集成有几种方式?
初始化 StreamingContext
经过建立输入DStreams来定义输入源。
经过将转换和输出操做应用于DStream来定义流式计算。
开始接收数据并使用它进行处理streamingContext.start()。
等待处理中止(手动或因为任何错误)使用streamingContext.awaitTermination()。
可使用手动中止处理streamingContext.stop()。
hive中的分析函数?
LEAD 能够选择指定要引导的行数。若是未指定要引导的行数,则前导是一行。 当当前行的前导超出窗口末尾时返回null。 ``` hive (default)> desc function lead; OK tab_name LEAD (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LEAD function is used to return data from the next row. ``` LAG 能够选择指定滞后的行数。若是未指定滞后行数,则滞后为一行。 当当前行的延迟在窗口开始以前延伸时,返回null。 ``` hive (default)> desc function lag; OK tab_name LAG (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LAG function is used to access data from a previous row. ``` FIRST_VALUE 这最多须要两个参数。第一个参数是您想要第一个值的列,第二个(可选)参数必须是false默认的布尔值。若是设置为true,则跳过空值。 LAST_VALUE 这最多须要两个参数。第一个参数是您想要最后一个值的列,第二个(可选)参数必须是false默认的布尔值。若是设置为true,则跳过空值。 2. OVER字句 OVER标准聚合: COUNT、SUM、MIN、MAX、AVG 使用带有任何原始数据类型的一个或多个分区列的PARTITION BY语句。 使用PARTITION BY和ORDER BY 与任何数据类型的一个或多个分区和/或排序列。 带有窗口的over具体说明。Windows能够在WINDOW子句中单独定义。窗口规范支持如下格式: ``` (ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING ``` 当指定ORDER BY时缺乏WINDOW子句,WINDOW规范默认为 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. 当缺乏ORDER BY和WINDOW子句时,WINDOW规范默认为 ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. OVER子句支持如下函数,但它不支持带有它们的窗口(参见HIVE-4797): Ranking functions: Rank, NTile, DenseRank, CumeDist, PercentRank. Lead and Lag functions. 3. 分析函数 RANK ROW_NUMBER DENSE_RANK CUME_DIST:CUME_DIST 小于等于当前值的行数/分组内总行数 PERCENT_RANK NTILE ```sql select s_id, NTILE(2) over(partition by c_id order by s_score) from score ``` 4. Hive 2.1.0及更高版本中支持Distinct (参见HIVE-9534) 聚合函数支持Distinct,包括SUM,COUNT和AVG,它们聚合在每一个分区内的不一样值上。当前实现具备如下限制:出于性能缘由,在分区子句中不能支持ORDER BY或窗口规范。支持的语法以下。 ```sql COUNT(DISTINCT a) OVER (PARTITION BY c) ``` Hive 2.2.0中支持ORDER BY和窗口规范(参见HIVE-13453)。一个例子以下。 ```sql COUNT(DISTINCT a) OVER (PARTITION BY c ORDER BY d ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) ``` 5. Hive 2.1.0及更高版本中OVER子句支持中的聚合函数(参见 HIVE-13475) 添加了对OVER子句中引用聚合函数的支持。例如,目前咱们能够在OVER子句中使用SUM聚合函数,以下所示。 ```sql SELECT rank() OVER (ORDER BY sum(b)) FROM T GROUP BY a; ```
常见的字符串用哪些函数?
hive (practice)> select concat_ws('|','abc','def','gh'); abc|def|gh
substr(string A, int start),substring(string A, int start)
substr(string A, int start, int len),substring(string A, int start, int len)
语法: regexp_replace(string A, string B, string C)
返回值: string
说明:将字符串A中的符合java正则表达式B的部分替换为C。注意,在有些状况下要使用转义字符,相似oracle中的regexp_replace函数。
语法: regexp_extract(string subject, string pattern, int index)
返回值: string
说明:将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符。
hive (practice)> select repeat('abc',5); abcabcabcabcabc
hive中如何去统计每周一,每月的第一天的pv?
获取指定日期月份的第一天、年份的第一天
select trunc('2019-02-24', 'YYYY'); select trunc('2019-02-24', 'MM');
指定日期下周的指定周几
select next_day('2019-02-24', 'TU');
按指定格式返回指定日期增长几个月后的日期
select add_months('2019-02-28', 1); select add_months('2019-02-24 21:15:16', 2, 'YYYY-MM-dd HH:mm:ss');
select count(guid) from table group by trunc(date, 'MM')
select count(guid) from table group by next_day('2019-06-08', 'MONDAY');
Spark2.x以后,官方已经将 ( DataFrame ) /Dataset (数据集)API的进行了 统一 ,DataFrame仅 是Dataset中每一个元素为Row类型的时候
不一样之处在于 Dataset 是 strongly typed (强类型的) ,而dataframe则是 untypedrel (弱类型的)
项目中hive的元数据在哪儿保存?
元数据怎么保证他的安全性?
修改元数据所用的用户名和密码
<property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property>
在mysql端设置metastore数据库的访问权限
sqoop导入导出有几种方式?增量导出?
导入
全量导入 ``` [hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ > --fields-terminated-by '\t' \ > -m 1 ``` 增量导入append ```sh bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3 ``` 增量导入lastmodified 表中必须有一列指示时间 ``` sqoop import \ --connect jdbc:mysql://master:3306/test \ --username hive \ --password 123456 \ --table customertest \ --check-column last_mod \ --incremental lastmodified \ --last-value "2016-12-15 15:47:29" \ -m 1 \ --append ```
导出
插入 默认状况下,sqoop-export将新行添加到表中;每行输入记录都被转换成一条INSERT语句,将此行记录添加到目标数据库表中。若是数据库中的表具备约束条件(例如,其值必须惟一的主键列)而且已有数据存在,则必须注意避免插入违反这些约束条件的记录。若是INSERT语句失败,导出过程将失败。此模式主要用于将记录导出到能够接收这些结果的空表中。 更新 若是指定了--update-key参数,则Sqoop将改成修改数据库中表中现有的数据。每一个输入记录都将转化为UPDATE语句修改现有数据。语句修改的行取决于--update-key指定的列名,若是数据库中的表中不存在的数据,那么也不会插入。 根据目标数据库,若是要更新已存在于数据库中的行,或者若是行尚不存在则插入行,则还能够--update-mode 使用allowinsert模式指定参数
使用窗口函数,指定足够长的窗口处理数据,总而使数据量足够大(最好在一个block大小左右),完成后使用foreachRDD将数据写出到HDFS
hive中的负责数据类型有哪些?
TINYINT
SMALLINT
INT/INTEGER
BIGINT
FLOAT
DOUBLE
DECIMAL
string
varchar
char
TIMESTAMP
DATE
复杂类型
ARRAY<data_type>
create table hive_array_test (name string, stu_id_list array<INT>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' ; -- 'FIELDS TERMINATED BY' :字段与字段之间的分隔符 -- 'COLLECTION ITEMS TERMINATED BY' :一个字段各个 item 的分隔符 [chen@centos01 ~]$ vi hive_array.txt 0601,1:2:3:4 0602,5:6 0603,7:8:9:10 0604,11:12 load data local inpath '/home/chen/hive_array.txt' into table hive_array_test;
hive (default)> select * from hive_array_test; OK hive_array_test.name hive_array_test.stu_id_list 0601 [1,2,3,4] 0602 [5,6] 0603 [7,8,9,10] 0604 [11,12] Time taken: 0.9 seconds, Fetched: 4 row(s)
MAP<primitive_type, data_type>
create table hive_map_test (id int, unit map<string, int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':'; ‘MAP KEYS TERMINATED BY’: key value 分隔符 [chen@centos01 ~]$ vi hive_map.txt 0 Chinese:100,English:80,math:59 1 Chinese:80,English:90 2 Chinese:100,English:100,math:60 load data local inpath '/home/chen/hive_map.txt' into table hive_map_test;
hive (default)> select * from hive_map_test; OK hive_map_test.id hive_map_test.unit 0 {"Chinese":100,"English":80,"math":59} 1 {"Chinese":80,"English":90} 2 {"Chinese":100,"English":100,"math":60} Time taken: 0.204 seconds, Fetched: 3 row(s) hive (default)> select id, unit['math'] from hive_map_test; OK id _c1 0 59 1 NULL 2 60 Time taken: 0.554 seconds, Fetched: 3 row(s)
STRUCT<col_name : data_type [COMMENT col_comment], ...>
create table hive_struct_test(id int, info struct<name:string, age:int, height:float>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':'; [chen@centos01 ~]$ vi hive_struct.txt 0,zhao:18:178 1,qian:30:173 2,sun:20:180 3,li:23:183 load data local inpath '/home/chen/hive_struct.txt' into table hive_struct_test;
hive (default)> select * from hive_struct_test; OK hive_struct_test.id hive_struct_test.info 0 {"name":"zhao","age":18,"height":178.0} 1 {"name":"qian","age":30,"height":173.0} 2 {"name":"sun","age":20,"height":180.0} 3 {"name":"li","age":23,"height":183.0} Time taken: 0.153 seconds, Fetched: 4 row(s) hive (default)> select id, info.name from hive_struct_test; OK id name 0 zhao 1 qian 2 sun 3 li Time taken: 0.133 seconds, Fetched: 4 row(s)
导入:load data [local] inpath '路径' overwrite into table 表名
导出:insert overwrite [local] directory '/home/hadoop/data' select * from emp_p;
local:加local是从本地加载,不加local是从hdfs加载
RDD的建立方式?
scala> var data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> sc.textFile("student.log") res0: org.apache.spark.rdd.RDD[String] = student.log MapPartitionsRDD[1] at textFile at <console>:25
hadoop的压缩格式
bin/hadoop checknative -a
[chen@centos01 hadoop-2.6.0-cdh5.14.2]$ bin/hadoop checknative -a 19/06/05 19:15:45 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native 19/06/05 19:15:45 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library Native library checking: hadoop: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libhadoop.so.1.0.0 zlib: true /lib64/libz.so.1 snappy: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libsnappy.so.1 lz4: true revision:10301 bzip2: true /lib64/libbz2.so.1 openssl: true /usr/lib64/libcrypto.so
sparksql处理完的dataframe结果要保存在数据库中,具体应该怎么作?
spark .read .table("mydb.emp") .write .mode(SaveMode.Ignore) .jdbc("jdbc:mysql://centos01:3306/mydb", "emp", prop)
mapreduce过程当中 shuffle的优化?
shuffle过程:map端:环形缓冲区(到80%) --》 溢写(分区,排序)--》combiner --》 compress --》 reduce端:--》 merge --》 排序 --》 group combiner可选择开启,在map端进行一次小reduce compress可选择开区,将结果压缩,减小IO shuffle中分区时采用HashPartitioner,相同的key会进入同一个reduce,key分布不均会致使数据倾斜,参考数据倾斜优化过程
hive二次排序的问题?
order by:全局有序,最终数据会进入一个reduce中,不推荐使用
sort by:局部有序,每一个reduce中的数据局有序
distribute by
经过distribute by设置分区 ,使用 sort by设置分区内排序 distribute by 常常与 sort by 在一块儿使用
cluster by:distribute by 和sort by条件一致时 使用cluster by
二次排序在by后面加上字段名a, b, c ...,hive会先按a排序,若a相同按b排序,若b相同按c排序
select * from score order by score.s_id asc, score.s_score desc;
score.s_id score.c_id score.s_score 01 03 99 01 02 90 01 01 80 02 03 80 02 01 70 02 02 60 03 03 80 03 02 80 03 01 80 04 01 50 04 02 30 04 03 20 05 02 87 05 01 76 06 03 34 06 01 31 07 03 98 07 02 89 Time taken: 96.333 seconds, Fetched: 18 row(s)
能够知道为何不用order by排序了
总方针:避免由于hbase前置匹配机制致使数据所有进入一个regionserver或某几个regionserver而产生数据热点
rowkey:
必须惟一
不建议用随机数做为rowkey,要根据实际业务需求设计rowkey
不能设置过大致使存储空间变大和索引变大
sparkstreaming窗口函数 开窗怎么开的,开的多长
开启窗口函数须要制定两个参数:
窗口长度 - The duration of the window (3 in the figure). 滑动间隔 - The interval at which the window operation is performed (2 in the figure).
// 滑动窗口:两个窗口之间有重叠部分,滑动时间小于窗口时间 val wc = res.reduceByKeyAndWindow( // 同一个key在同一个窗口不一样批次数据的聚合操做 (a:Int, b:Int) => a + b, // 窗口宽度,表明一个窗口具体计算的数据量 Seconds(15), // 滑动时间间隔,表明多久时间计算一个窗口的数据 Seconds(10) )
通常窗口长度大于滑动间隔
增长窗口宽度,对一个窗口中的数据操做就能够作到用sparkstreaming跑批