- create tablejava
- create external tablenode
● 在导入数据到外部表,数据并无移动到本身的数据仓库目录下,也就是说外部表中的数据并非由它本身来管理的!而表则不同;python
● 在删除表的时候,Hive将会把属于表的元数据和数据所有删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!mysql
注意:sql
一、- create table 建立内部表,create external table 建立外部表apache
二、建议在工做中用外部表来建立缓存
● 在Hive中,表中的一个Partition对应于表下的一个目录,全部的Partition的数据都储存在对应的目录中安全
– 例如:pvs 表中包含 ds 和 city 两个 Partition,则 – 对应于 ds = 20090801, ctry = US 的 HDFS 子目录为:/wh/pvs/ds=20090801/ctry=US; – 对应于 ds = 20090801, ctry = CA 的 HDFS 子目录为;/wh/pvs/ds=20090801/ctry=CA
● Partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照必定的规格和条件进行管理。服务器
• hive中table能够拆分红partition,table和partition能够经过‘CLUSTERED BY ’进一步分bucket,bucket中的数据能够经过‘SORT BY’排序。 • 'set hive.enforce.bucketing = true' 能够自动控制上一轮reduce的数量从而适 配bucket的个数,固然,用户也能够自主设置mapred.reduce.tasks去适配 bucket个数
• Bucket主要做用:
– 数据sampling,随机采样
– 提高某些查询操做效率,例如mapside join闭包
• 查看sampling数据: – hive> select * from student tablesample(bucket 1 out of 2 on id); – tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y) – y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32 时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪一个bucket开始抽 取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据 ,分别为第3个bucket和第(3+16=)19个bucket的数据
– 原生类型 • TINYINT • SMALLINT • INT • BIGINT • BOOLEAN • FLOAT • DOUBLE • STRING • BINARY(Hive 0.8.0以上才可用) • TIMESTAMP(Hive 0.8.0以上才可用)
– 复合类型 • Arrays:ARRAY<data_type> • Maps:MAP<primitive_type, data_type> ##复合类型 • Structs:STRUCT<col_name: data_type[COMMENT col_comment],……> • Union:UNIONTYPE<data_type, data_type,……>
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age
FROM page_view pv
JOIN user u
ON (pv.userid = u.userid);
SELECT pageid, age, count(1)
FROM pv_users
GROUP BY pageid, age;
– 做业会经过input的目录产生一个或者多个map任务。set dfs.block.size
– Map越多越好吗?是否是保证每一个map处理接近文件块的大小?
– 如何合并小文件,减小map数?
set mapred.max.split.size=100000000; #100M set mapred.min.split.size.per.node=100000000; set mapred.min.split.size.per.rack=100000000; set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
– 如何适当的增长map数?
set mapred.map.tasks=10;
– Map端聚合 hive.map.aggr=true 。 Mr中的Combiners.
• Reduce的优化: – hive.exec.reducers.bytes.per.reducer;reduce任务处理的数据量
– 调整reduce的个数: • 设置reduce处理的数据量 • set mapred.reduce.tasks=10
select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 写成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
Set mapred.reduce.tasks = 100 Create table a_standby_table as select * from a distribute by XXX
– Where中的分区条件,会提早生效,没必要特地作子查询,直接Join和GroupBy
– join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积
– /*+ MAPJOIN(tablelist) */,必须是小表,不要超过1G,或者50万条记录
– 先作union all再作join或group by等操做能够有效减小MR过程,尽管是多个Select,最终只有一个
mr
Union:有去重操做,会消耗系统性能
Union all:没有去重操做,
– 从一份基础表中按照不一样的维度,一次组合出不一样的数据 – FROM from_statement – INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1 – INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
– 当文件大小比阈值小时,hive会启动一个mr进行合并 – hive.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True – hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False – hive.merge.size.per.task = 256*1000*1000 合并文件的大小
– 必须设置参数:set hive.groupby.skewindata=true; – select dt, count(distinct uniq_id), count(distinct ip) – from ods_log where dt=20170301 group by dt
• 一个MR job
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (a.key = c.key1)
• 生成多个MR job
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
• 按照JOIN顺序中的最后一个表应该尽可能是大表,由于JOIN前一阶段生成的数据会存在于
Reducer的buffer中,经过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间
结果数据(这个中间结果数据多是JOIN顺序中,前面表链接的结果的Key,数据量相对较小,
内存开销就小),这样,与后面的大表进行链接时,只须要从buffer中读取缓存的Key,与大表
中的指定Key进行链接,速度会更快,也可能避免内存缓冲区溢出。
• 使用hint的方式启发JOIN操做
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1); a表被视为大表
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key; MAPJION会把小表所有读入内存中,在map阶段直接 拿另一个表的数据和内存中表数据作匹配,因为在 map是进行了join操做,省去了reduce运行的效率也 会高不少.
• 左链接时,左表中出现的JOIN字段都保留,右表没有链接上的都为空
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')
• 执行顺序是,首先完成2表JOIN,而后再经过WHERE条件进行过滤,这样在JOIN过程当中可能会
输出大量结果,再对这些结果进行过滤,比较耗时。能够进行优化,将WHERE条件放在ON后
,在JOIN的过程当中,就对不知足条件的记录进行了预先过滤。
• 并行实行: – 同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段。某个特 定的job可能包含众多的阶段,而这些阶段可能并不是彻底相互依赖的,也就是说能够并行执行 的,这样可能使得整个job的执行时间缩短。hive执行开启:set hive.exec.parallel=true
• Join
• Group by
• Count Distinct
• key分布不均致使的
• 人为的建表疏忽
• 业务数据特色
• 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少许(1个或几个)reduce子任务未完成。
• 查看未完成的子任务,能够看到本地读写数据量积累很是大,一般超过10GB能够认定为发生数据倾斜。
• 平均记录数超过50w且最大记录数是超过平均记录数的4倍。
• 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍。
• hive.groupby.skewindata=true
• 缘由
• Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,若是左边表的key相对
分散,读入内存的数据会比较小,join任务执行会比较快;而若是左边的表key比较集中,而这张表的数据量很大,
那么数据倾斜就会比较严重,而若是这张表是小表,则仍是应该把这张表放在join左边。
• 思路
• 将key相对分散,而且数据量小的表放在join的左边,这样能够有效减小内存溢出错误发生的概率
• 使用map join让小的维度表先进内存。
• 方法
• Small_table join big_table
• 缘由 • 日志中有一部分的userid是空或者是0的状况,致使在用user_id进行hash分桶的时候,会将日志中userid为0或者 空的数据分到一块儿,致使了过大的斜率。 • 思路 • 把空值的key变成一个字符串加上随机数,把倾斜的数据分到不一样的reduce上,因为null值关联不上,处理后并不 影响最终结果。 • 方法 • on case when (x.uid = '-' or x.uid = '0‘ or x.uid is null) then concat('dp_hive_search',rand()) else x.uid end = f.user_id
• 默认状况下,Hive的元数据信息存储在内置的Derby数据中。
• Hive支持将元数据存储在MySQL中
• 元数据存储配置:
– 【本地配置1】:默认
– 【本地配置2】:本地搭建mysql,经过localhost:Port方式访问
– 【远程配置】:远程搭建mysql,经过IP:Port方式访问
• 第一步:安装MySQL服务器端和MySQL客户端,并启动MySQL服务 • 安装: – yum install mysql – yum install mysql-server • 启动: – /etc/init.d/mysqld start • 设置用户名和密码: – mysqladmin -u root password '111111‘ • 测试登陆是否成功: – mysql -uroot -p111111
①下载apache-hive-0.13.0-bin.tgz,并解压:
②在conf目录下,建立hive-site.xml配置文件:
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>111111</value> </property> </configuration>
③ 修改profile,配置环境变量:
④将mysql-connector-java-5.1.41-bin.jar拷贝到hive home的lib目录下,以支
持hive对mysql的操做
注意:
测试hive的前提得打开hadoop集群,start-all.sh
⑤hive在建立表的过程当中,报以下错误处理:
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: org.apache.hadoop.ipc.RemoteException org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create directory /user/hive/warehouse/w_a. Name node is in safe mode.
处理方法:
bin/hadoop dfsadmin -safemode leave
关闭Hadoop的安全模式
⑥测试hive
hive> create EXTERNAL TABLE w_a( usrid STRING, age STRING, sex STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; #建立表 OK Time taken: 0.309 seconds hive> show tables; OK w_a Time taken: 0.049 seconds, Fetched: 1 row(s) hive> drop table w_a; #删除表 OK 导入数据: [root@master badou]# hive -f create_ex_table.sql #-f指定sql文件导入到hive中 Logging initialized using configuration in jar:file:/usr/local/src/apache-hive-0.13.0-bin/lib/hive-common-0.13.0.jar!/hive-log4j.properties OK Time taken: 1.168 seconds OK Time taken: 0.077 seconds 导入文本到hive: hive> LOAD DATA LOCAL INPATH '/home/ba/a.txt' OVERWRITE INTO TABLE w_a; #导入a.txt 文本到hive中 Copying data from file:/home/badou/a.txt Copying file: file:/home/badou/a.txt Failed with exception java.io.IOException: File /tmp/hive-root/hive_2019-04-28_05-20-33_879_4807090646149011006-1/-ext-10000/a.txt could only be replicated to 0 nodes, instead of 1
⑦
⑧
⑨
⑩
⑪
def father(name): print('from father %s' %name) def son(): print('from the son') def grandson(): print('from the grandson') grandson() son() father('朱锐')
def father(name): print('from father %s' %name) def son(): print('from the son') def grandson(): print('from the grandson') grandson() son() father('朱锐') ''' 闭包 ''' def father(name): def son(): # name='simon1' print('个人爸爸是%s' %name) def grandson(): print('个人爷爷是%s' %name) grandson() son() father('simon')
import time def timmer(func): def wrapper(): # print(func) start_time=time.time() func() #就是在运行test() stop_time=time.time() print('运行时间是%s' %(stop_time-start_time)) return wrapper @timmer #语法糖,这个是重点 def test(): time.sleep(3) print('test函数运行完毕') # res=timmer(test) #返回的是wrapper的地址 # res() #执行的是wrapper() # test=timmer(test) #返回的是wrapper的地址 # test() #执行的是wrapper() test() ''' 语法糖 ''' # @timmer #就至关于 test=timmer(test)
#未加返回值 import time def timmer(func): def wrapper(): # print(func) start_time=time.time() func() #就是在运行test() stop_time=time.time() print('运行时间是%s' %(stop_time-start_time)) return 123 return wrapper @timmer #语法糖 def test(): time.sleep(3) print('test函数运行完毕') return '这是test的返回值' res=test() #就是在运行wrapper print(res) 运行结果以下: C:\Python35\python3.exe G:/python_s3/day20/加上返回值.py test函数运行完毕 运行时间是3.000171661376953 123
#加上返回值 import time def timmer(func): def wrapper(): # print(func) start_time=time.time() res=func() #就是在运行test() ##主要修改这里1 stop_time=time.time() print('运行时间是%s' %(stop_time-start_time)) return res ##修改这里2 return wrapper @timmer #语法糖 def test(): time.sleep(3) print('test函数运行完毕') return '这是test的返回值' res=test() #就是在运行wrapper print(res) 运行结果以下: C:\Python35\python3.exe G:/python_s3/day20/加上返回值.py test函数运行完毕 运行时间是3.000171661376953 这是test的返回值
import time def timmer(func): def wrapper(name,age): #加入参数,name,age # print(func) start_time=time.time() res=func(name,age) ##加入参数,name,age stop_time=time.time() print('运行时间是%s' %(stop_time-start_time)) return res return wrapper @timmer #语法糖 def test(name,age): #加入参数,name,age time.sleep(3) print('test函数运行完毕,名字是【%s】,年龄是【%s】' % (name,age)) return '这是test的返回值' res=test('simon',18) #就是在运行wrapper print(res)
使用可变长参数代码以下:达到的效果是传参灵活
import time def timmer(func): def wrapper(*args,**kwargs): #test('simon',18) args=('simon') kwargs={'age':18} # print(func) start_time=time.time() res=func(*args,**kwargs) #就是在运行test() func(*('simon'),**{'age':18}) stop_time=time.time() print('运行时间是%s' %(stop_time-start_time)) return res return wrapper @timmer #语法糖 def test(name,age): time.sleep(3) print('test函数运行完毕,名字是【%s】,年龄是【%s】' % (name,age)) return '这是test的返回值' def test1(name,age,gender): time.sleep(1) print('test函数运行完毕,名字是【%s】,年龄是【%s】,性别是【%s】' % (name,age,gender)) res=test('simon',18) #就是在运行wrapper print(res) test1('simon',18,'male')
#无参装饰器
import time
def timmer(func): def wrapper(*args,**kwargs): start_time=time.time() res=func(*args,**kwargs) stop_time=time.time() print('run time is %s' %(stop_time-start_time)) return res return wrapper @timmer def foo(): time.sleep(3) print('from foo') foo()
#有参装饰器
def auth(driver='file'):
def auth2(func): def wrapper(*args,**kwargs): name=input("user: ") pwd=input("pwd: ") if driver == 'file': if name == 'simon' and pwd == '123': print('login successful') res=func(*args,**kwargs) return res elif driver == 'ldap': print('ldap') return wrapper return auth2 @auth(driver='file') def foo(name): print(name) foo('simon')
#验证功能装饰器
#验证功能装饰器
user_list=[
{'name':'simon','passwd':'123'}, {'name':'zhurui','passwd':'123'}, {'name':'william','passwd':'123'}, {'name':'zhurui1','passwd':'123'}, ] current_dic={'username':None,'login':False} def auth_func(func): def wrapper(*args,**kwargs): if current_dic['username'] and current_dic['login']: res=func(*args,**kwargs) return res username=input('用户名:').strip() passwd=input('密码:').strip() for user_dic in user_list: if username == user_dic['name'] and passwd == user_dic['passwd']: current_dic['username']=username current_dic['login']=True res=func(*args,**kwargs) return res else: print('用户名或者密码错误') # if username == 'simon' and passwd == '123': # user_dic['username']=username # user_dic['login']=True # res=func(*args,**kwargs) # return res # else: # print('用户名或密码错误') return wrapper @auth_func def index(): print('欢迎来到某宝首页') @auth_func def home(name): print('欢迎回家%s' %name) @auth_func def shopping_car(name): print('%s购物车里有[%s,%s,%s]' %(name,'餐具','沙发','电动车')) print('before----->',current_dic) index() print('after---->',current_dic) home('simon') # shopping_car('simon')
#带参数验证功能装饰器
#带参数验证功能装饰器
user_list=[
{'name':'simon','passwd':'123'}, {'name':'zhurui','passwd':'123'}, {'name':'william','passwd':'123'}, {'name':'zhurui1','passwd':'123'}, ] current_dic={'username':None,'login':False} def auth(auth_type='filedb'): def auth_func(func): def wrapper(*args,**kwargs): print('认证类型是',auth_type) if auth_type == 'filedb': if current_dic['username'] and current_dic['login']: res = func(*args, **kwargs) return res username=input('用户名:').strip() passwd=input('密码:').strip() for user_dic in user_list: if username == user_dic['name'] and passwd == user_dic['passwd']: current_dic['username']=username current_dic['login']=True res = func(*args, **kwargs) return res else: print('用户名或者密码错误') elif auth_type == 'ldap': print('这玩意没搞过,不知道怎么玩') res = func(*args, **kwargs) return res else: print('鬼才知道你用的什么认证方式') res = func(*args, **kwargs) return res return wrapper return auth_func @auth(auth_type='filedb') #auth_func=auth(auth_type='filedb')-->@auth_func 附加了一个auth_type --->index=auth_func(index) def index(): print('欢迎来到某宝主页') @auth(auth_type='ldap') def home(name): print('欢迎回家%s' %name) # @auth(auth_type='sssssss') def shopping_car(name): print('%s的购物车里有[%s,%s,%s]' %(name,'奶茶','妹妹','娃娃')) # print('before-->',current_dic) # index() # print('after--->',current_dic) # home('simon') shopping_car('simon')