Hive 概览#
Hive 基本架构#
-
用户接口
: CLI、JDBC/ODBC、Web UI 层 -
Trift Server
: 支持多种语言程序来操纵 Hive -
Driver
: Driver 驱动器、Compiler 编译器、Optimizer 优化器、Executor 执行器Hive 的核心是驱动引擎, 驱动引擎由四部分组成:
- 解释器:将 HiveSQL 语句转换成抽象语法树 (AST)
- 编译器:将抽象语法树转换成逻辑执行计划
- 优化器:对逻辑执行计划进行优化
- 执行器:调用底层的执行框架执行逻辑计划
-
元数据存储系统
: Hive 中的元数据通常包括表的名字,表的列和分区及其属性,表的属性(内部表和 外部表),表的数据所在目录.Metastore 默认存在自带的
Derby
数据库中,缺点就是不适合多用户操作,并且数据存 储目录不固定。数据库跟着 Hive 走,极度不方便管理
** 解决方案:** 通常存我们自己创建的 MySQL 库(本地 或 远程),Hive 和 MySQL 之间通过 MetaStore 服务交互
Hive 中的表#
Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大
分区表和分桶表#
分区#
按照数据表的某列或某些列分为多个区,分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where 字句的中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能
创建分区表
在 Hive 中可以使用 PARTITIONED BY
子句创建分区表
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';
加载数据到分区表
加载数据到分区表时候必须要指定数据所处的分区
# 加载部门编号为20的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 加载部门编号为30的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)
分桶#
并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。分桶是相对分区进行更细粒度的划分,将整个数据内容按照某列属性值得 hash 值进行区分
创建分桶表
在 Hive 中,我们可以通过 CLUSTERED BY
指定分桶列,并通过 SORTED BY 指定桶中数据的排序参考列
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';
加载数据到分桶表
--1. 设置强制分桶,Hive 2.x 不需要这一步
set hive.enforce.bucketing = true;
--2. 导入数据
INSERT INTO TABLE emp_bucket SELECT * FROM emp; --这里的 emp 表就是一张普通的雇员表
内部表和外部表#
区别
- 创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。
- 删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据
- 建表的时候外部表额外加上一个
external
关键字
使用选择
- 如果数据的所有处理都在 Hive 中进行,那么倾向于选择内部表
- 使用外部表的场景主要是共享数据源的情况,可以使用外部表访问存储在 HDFS 上的初始数据,然后通过 Hive 转换数据并存到内部表中
自定义 UDF#
参考链接: Hive UDF
UDF(User Defined Function),即用户自定义函数,主要包含:
UDF(user-defined function)
: 一进一出,给定一个参数,输出一个处理后的数据 UDAF(user-defined aggregate function)
: 多进一出,属于聚合函数,类似于 count、sum 等函数 UDTF(user-defined table function)
: 一进多出,属于一个参数,返回一个列表作为结果
优化#
(1) 数据倾斜#
原因
- key 分布不均匀
- 业务数据本身的特性
- SQL 语句造成数据倾斜
解决方法
-
hive 设置
hive.map.aggr=true
和hive.groupby.skewindata=true
-
有数据倾斜的时候进行负载均衡,当设定
hive.groupby.skewindata=true
, 生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 在根据预处理的数据结果按照 Group By Key 分布到 Reduce 中 (这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。 -
SQL 语句调整:
选用join key 分布最均匀的表作为驱动表
。做好列裁剪和 filter 操作,以达到两表 join 的时候,数据量相对变小的效果。大小表Join
: 使用 map join 让小的维度表(1000 条以下的记录条数)先进内存,在 Map 端完成 Reduce。大表Join大表
:把空值的 Key 变成一个字符串加上一个随机数,把倾斜的数据分到不同的 reduce 上,由于 null 值关联不上,处理后并不影响最终的结果。count distinct大量相同特殊值
:count distinct 时,将值为空的情况单独处理,如果是计算 count distinct,可以不用处理,直接过滤,在做后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union.
(2) 通用设置#
hive.optimize.cp=true
:列裁剪hive.optimize.prunner
:分区裁剪hive.limit.optimize.enable=true
:优化 LIMIT n 语句hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大文件数
(3) 本地模式 (小任务)#
开启本地模式 hive> set hive.exec.mode.local.auto=true
-
job 的输入数据大小必须小于参数:
hive.exec.mode.local.auto.inputbytes.max(默认128MB)
-
job 的 map 数必须小于参数:
hive.exec.mode.local.auto.tasks.max(默认4)
-
job 的 reduce 数必须为 0 或者 1
(4) 并发执行#
开启并行计算 hive> set hive.exec.parallel=true
相关参数 hive.exec.parallel.thread.number
:一次 sql 计算中允许并执行的 job 数量
(5) Strict Mode (严格模式)#
主要是防止一群 sql 查询将集群压力大大增加
开启严格模式: hive> set hive.mapred.mode = strict
一些限制:
- 对于分区表,必须添加 where 对于分区字段的 条件过滤
- orderby 语句必须包含 limit 输出限制
- 限制执行笛卡尔积 查询
(6) 推测执行#
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;
(7) 分组#
-
两个聚集函数不能有不同的 DISTINCT 列,以下表达式是错误的:
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
-
SELECT 语句中只能有 GROUP BY 的列或者聚集函数
-
hive.multigroupby.singlemar=true
:当多个 GROUP BY 语句有相同的分组列,则会优化为一个 MR 任务
(8) 聚合#
开启 map 聚合 hive> set hive.map.aggr=true
相关参数
-
hive.groupby.mapaggr.checkinterval
: map 端 group by 执行聚合时处理的多少行数据(默认:100000) -
hive.map.aggr.hash.min.reduction
: 进行聚合的最小比例(预先对 100000 条数据做聚合,若聚合之后的数据量 /100000 的值大于该配置 0.5,则不会聚合) -
hive.map.aggr.hash.percentmemory
:map 端聚合使用的内存的最大值 -
hive.map.aggr.hash.force.flush.memory.threshold
: map 端做聚合操作是 hash 表的最大可用内容,大于该值则会触发 flush -
hive.groupby.skewindata
:是否对 GroupBy 产生的数据倾斜做优化,默认为 false
(9) 合并小文件#
hive.merg.mapfiles=true
:合并 map 输出hive.merge.mapredfiles=false
:合并 reduce 输出hive.merge.size.per.task=256*1000*1000
:合并文件的大小hive.mergejob.maponly=true
:如果支持 CombineHiveInputFormat 则生成只有 Map 的任务执行 mergehive.merge.smallfiles.avgsize=16000000
:文件的平均大小小于该值时,会启动一个 MR 任务执行 merge。
(10) 自定义 map/reduce 数目#
Map 数量相关的参数
-
mapred.max.split.size
:一个 split 的最大值,即每个 map 处理文件的最大值 -
mapred.min.split.size.per.node
:一个节点上 split 的最小值 -
mapred.min.split.size.per.rack
:一个机架上 split 的最小值
Reduce 数量相关的参数
-
mapred.reduce.tasks
: 强制指定 reduce 任务的数量 -
hive.exec.reducers.bytes.per.reducer
每个 reduce 任务处理的数据量 -
hive.exec.reducers.max
每个任务最大的 reduce 数 [Map 数量>= Reduce 数量 ]
(11) 使用索引:#
hive.optimize.index.filter
:自动使用索引hive.optimize.index.groupby
:使用聚合索引优化 GROUP BY 操作
支持的存储格式#
ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用
-
TextFile 存储为纯文本文件
。 这是 Hive 默认的文件存储格式。这种存储方式数据不做压缩,磁盘开销大,数据解析开销大。 -
SequenceFile
: SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以 <key,value> 的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。 -
RCFile
: RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。 -
ORC Files
: ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。 -
Avro Files
: Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。 -
Parquet
: Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。
常用操作命令#
常用 DDL 操作#
查看数据列表: show databases;
使用数据库:USE database_name;
新建数据库:
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name --DATABASE|SCHEMA 是等价的
[COMMENT database_comment] --数据库注释
[LOCATION hdfs_path] --存储在 HDFS 上的位置
[WITH DBPROPERTIES (property_name=property_value, ...)]; --指定额外属性
查看数据库信息:
DESC DATABASE [EXTENDED] db_name; --EXTENDED 表示是否显示额外属性
删除数据库:
-- 默认行为是 RESTRICT,如果数据库中存在表则删除失败。
-- 要想删除库及其中的表,可以使用 CASCADE 级联删除
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];
创建表#
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name --表名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列数据类型
[COMMENT table_comment] --表描述
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --分区表分区规则
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --分桶表分桶规则
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --指定倾斜列和值
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 指定行分隔符、存储文件格式或采用自定义存储格式
[LOCATION hdfs_path] -- 指定表的存储位置
[TBLPROPERTIES (property_name=property_value, ...)] --指定表的属性
[AS select_statement]; --从查询结果创建表
支持从查询语句的结果创建表:
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';
修改表#
重命名表:
ALTER TABLE table_name RENAME TO new_table_name;
修改列:
ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];
--示例
-- 修改字段名和类型
ALTER TABLE emp_temp CHANGE empno empno_new INT;
-- 修改字段 sal 的名称 并将其放置到 empno 字段后
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;
-- 为字段增加注释
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';
新增列:
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');
清空表 / 删除表#
清空表:
-- 清空整个表或表指定分区中的数据
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
目前只有内部表才能执行 TRUNCATE
操作,外部表执行时会抛出异常 Cannot truncate non-managed table XXXX
删除表:
DROP TABLE [IF EXISTS] table_name [PURGE];
- 内部表:不仅会删除表的元数据,同时会删除 HDFS 上的数据;
- 外部表:只会删除表的元数据,不会删除 HDFS 上的数据;
- 删除视图引用的表时,不会给出警告(但视图已经无效了,必须由用户删除或重新创建)
其他#
查看视图列表:
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards']; --仅支持 Hive 2.2.0 +
查看表的分区列表:
SHOW PARTITIONS table_name;
查看表 / 视图的创建语句:
SHOW CREATE TABLE ([db_name.]table_name|view_name);
常用 DML 操作#
和关系型数据库类似,具体参考: LanguageManual DML
排序关键字#
-
sort by
:不是全局排序,其在数据进入 reducer 前完成排序 -
order by
:会对输入做全局排序,因此只有一个 reducer (多个 reducer 无法保证全局有序). 只有一个 reducer, 会导致当输入规模较大时,需要较长的计算时间。 -
distribute by
:按照指定的字段对数据进行划分输出到不同的 reduce 中 -
cluster by
: 当 distribute by 和 sort by 的字段相同时,等同于 cluster by. 可以看做特殊的 distribute + sort
Hive 中追加导入数据方式#
-
从本地导入:
load data local inpath ‘/home/1.txt’ (overwrite)into table student;
-
从 HDFS 导入:
load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;
-
查询导入:
create table student1 as select * from student;(也可以具体查询某项数据)
-
查询结果导入:
insert (overwrite)into table staff select * from track_log;
Hive 导出数据方式#
-
用 insert overwrite 导出方式
1. 导出到本地:insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
2. 导出到 HDFS
insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
-
Bash shell 覆盖追加导出
$ bin/hive -e “select * from staff;” > /home/z/backup.log
- sqoop 把 hive 数据导出到外部