Yige

Yige

Build

Hive概覽

Hive 概覽#

Hive 基本架構#

image.png

  • 用戶介面: CLI、JDBC/ODBC、Web UI 層

  • Trift Server: 支持多種語言程序來操縱 Hive

  • Driver: Driver 驅動器、Compiler 編譯器、Optimizer 優化器、Executor 執行器

    Hive 的核心是驅動引擎, 驅動引擎由四部分組成:

    1. 解釋器:將 HiveSQL 語句轉換成抽象語法樹 (AST)
    2. 編譯器:將抽象語法樹轉換成邏輯執行計劃
    3. 優化器:對邏輯執行計劃進行優化
    4. 執行器:調用底層的執行框架執行邏輯計劃
  • 元數據存儲系統 : Hive 中的元數據通常包括表的名字,表的列和分區及其屬性,表的屬性(內部表和 外部表),表的數據所在目錄.

    Metastore 默認存在自帶的 Derby 數據庫中,缺點就是不適合多用戶操作,並且數據存 儲目錄不固定。數據庫跟著 Hive 走,極度不方便管理
    ** 解決方案:** 通常存我們自己創建的 MySQL 庫(本地 或 遠程),Hive 和 MySQL 之間通過 MetaStore 服務交互

Hive 中的表#

Hive 中的表對應為 HDFS 上的指定目錄,在查詢數據時,默認會對全表進行掃描,這樣時間和性能的消耗都非常大

分區表和分桶表#

分區#

按照數據表的某列或某些列分為多個區,分區為 HDFS 上表目錄的子目錄,數據按照分區存儲在子目錄中。如果查詢的 where 字句的中包含分區條件,則直接從該分區去查找,而不是掃描整個表目錄,合理的分區設計可以極大提高查詢速度和性能

創建分區表
在 Hive 中可以使用 PARTITIONED BY 子句創建分區表

CREATE EXTERNAL TABLE emp_partition(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2)
    )
    PARTITIONED BY (deptno INT)   -- 按照部門編號進行分區
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_partition';

加載數據到分區表
加載數據到分區表時必須要指定數據所處的分區

# 加載部門編號為20的數據到表中
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 加載部門編號為30的數據到表中
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

分桶#

並非所有的數據集都可以形成合理的分區,分區的數量也不是越多越好,過多的分區條件可能會導致很多分區上沒有數據。分桶是相對分區進行更細粒度的劃分,將整個數據內容按照某列屬性值得 hash 值進行區分

創建分桶表
在 Hive 中,我們可以通過 CLUSTERED BY 指定分桶列,並通過 SORTED BY 指定桶中數據的排序參考列

CREATE EXTERNAL TABLE emp_bucket(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2),
    deptno INT)
    CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS  --按照員工編號散列到四個 bucket 中
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_bucket';

加載數據到分桶表

--1. 設置強制分桶,Hive 2.x 不需要這一步
set hive.enforce.bucketing = true;

--2. 導入數據
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  --這裡的 emp 表就是一張普通的雇員表

內部表和外部表#

區別

  • 創建表時:創建內部表時,會將數據移動到數據倉庫指向的路徑;若創建外部表,僅記錄數據所在的路徑,不對數據的位置做任何改變。
  • 刪除表時:在刪除表的時候,內部表的元數據和數據會被一起刪除, 而外部表只刪除元數據,不刪除數據。這樣外部表相對來說更加安全些,數據組織也更加靈活,方便共享源數據
  • 建表的時候外部表額外加上一個 external關鍵字

使用選擇

  • 如果數據的所有處理都在 Hive 中進行,那麼傾向於選擇內部表
  • 使用外部表的場景主要是共享數據源的情況,可以使用外部表訪問存儲在 HDFS 上的初始數據,然後通過 Hive 轉換數據並存到內部表中

自定義 UDF#

參考鏈接: Hive UDF

UDF(User Defined Function),即用戶自定義函數,主要包含:

  • UDF(user-defined function): 一進一出,給定一個參數,輸出一個處理後的數據 ​
  • UDAF(user-defined aggregate function): 多進一出,屬於聚合函數,類似於 count、sum 等函數 ​
  • UDTF(user-defined table function): 一進多出,屬於一個參數,返回一個列表作為結果

優化#

(1) 數據傾斜#

原因

  • key 分佈不均勻
  • 業務數據本身的特性
  • SQL 語句造成數據傾斜

解決方法

  • hive 設置hive.map.aggr=truehive.groupby.skewindata=true

  • 有數據傾斜的時候進行負載均衡,當設定hive.groupby.skewindata=true, 生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 在根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中 (這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。

  • SQL 語句調整:

    1. 選用join key 分佈最均勻的表作為驅動表。做好列裁剪和 filter 操作,以達到兩表 join 的時候,數據量相對變小的效果。
    2. 大小表Join: 使用 map join 讓小的維度表(1000 條以下的記錄條數)先進內存,在 Map 端完成 Reduce。
    3. 大表Join大表:把空值的 Key 變成一個字符串加上一個隨機數,把傾斜的數據分到不同的 reduce 上,由於 null 值關聯不上,處理後並不影響最終的結果。
    4. count distinct大量相同特殊值:count distinct 時,將值為空的情況單獨處理,如果是計算 count distinct,可以不用處理,直接過濾,在做後結果中加 1。如果還有其他計算,需要進行 group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行 union.

(2) 通用設置#

  • hive.optimize.cp=true:列裁剪
  • hive.optimize.prunner:分區裁剪
  • hive.limit.optimize.enable=true:優化 LIMIT n 語句
  • hive.limit.row.max.size=1000000:
  • hive.limit.optimize.limit.file=10:最大文件數

(3) 本地模式 (小任務)#

開啟本地模式 hive> set hive.exec.mode.local.auto=true

  • job 的輸入數據大小必須小於參數:hive.exec.mode.local.auto.inputbytes.max(默認128MB)

  • job 的 map 數必須小於參數:hive.exec.mode.local.auto.tasks.max(默認4)

  • job 的 reduce 數必須為 0 或者 1

(4) 並發執行#

開啟並行計算 hive> set hive.exec.parallel=true
相關參數 hive.exec.parallel.thread.number:一次 sql 計算中允許並執行的 job 數量

(5) Strict Mode (嚴格模式)#

主要是防止一群 sql 查詢將集群壓力大大增加

開啟嚴格模式: hive> set hive.mapred.mode = strict

一些限制:

  • 對於分區表,必須添加 where 對於分區字段的 條件過濾
  • orderby 語句必須包含 limit 輸出限制
  • 限制執行笛卡爾積 查詢

(6) 推測執行#

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;

(7) 分組#

  • 兩個聚集函數不能有不同的 DISTINCT 列,以下表達式是錯誤的:

    INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
    
  • SELECT 語句中只能有 GROUP BY 的列或者聚集函數

  • hive.multigroupby.singlemar=true:當多個 GROUP BY 語句有相同的分組列,則會優化為一個 MR 任務

(8) 聚合#

開啟 map 聚合 hive> set hive.map.aggr=true

相關參數

  • hive.groupby.mapaggr.checkinterval: map 端 group by 執行聚合時處理的多少行數據(默認:100000)

  • hive.map.aggr.hash.min.reduction: 進行聚合的最小比例(預先對 100000 條數據做聚合,若聚合之後的數據量 /100000 的值大於該配置 0.5,則不會聚合)

  • hive.map.aggr.hash.percentmemory:map 端聚合使用的內存的最大值

  • hive.map.aggr.hash.force.flush.memory.threshold: map 端做聚合操作是 hash 表的最大可用內容,大於該值則會觸發 flush

  • hive.groupby.skewindata :是否對 GroupBy 產生的數據傾斜做優化,默認為 false

(9) 合併小文件#

  • hive.merg.mapfiles=true:合併 map 輸出
  • hive.merge.mapredfiles=false:合併 reduce 輸出
  • hive.merge.size.per.task=256*1000*1000:合併文件的大小
  • hive.mergejob.maponly=true:如果支持 CombineHiveInputFormat 則生成只有 Map 的任務執行 merge
  • hive.merge.smallfiles.avgsize=16000000:文件的平均大小小於該值時,會啟動一個 MR 任務執行 merge。

(10) 自定義 map/reduce 數目#

Map 數量相關的參數

  • mapred.max.split.size:一個 split 的最大值,即每個 map 處理文件的最大值

  • mapred.min.split.size.per.node:一個節點上 split 的最小值

  • mapred.min.split.size.per.rack:一個機架上 split 的最小值

Reduce 數量相關的參數

  • mapred.reduce.tasks: 強制指定 reduce 任務的數量

  • hive.exec.reducers.bytes.per.reducer 每個 reduce 任務處理的數據量

  • hive.exec.reducers.max 每個任務最大的 reduce 數 [Map 數量>= Reduce 數量 ]

(11) 使用索引:#

  • hive.optimize.index.filter:自動使用索引
  • hive.optimize.index.groupby:使用聚合索引優化 GROUP BY 操作

支持的存儲格式#

ORC 和 Parquet 的綜合性能突出,使用較為廣泛,推薦使用

  • TextFile 存儲為純文本文件。 這是 Hive 默認的文件存儲格式。這種存儲方式數據不做壓縮,磁碟開銷大,數據解析開銷大。

  • SequenceFile: SequenceFile 是 Hadoop API 提供的一種二進制文件,它將數據以 <key,value> 的形式序列化到文件中。這種二進制文件內部使用 Hadoop 的標準的 Writable 接口實現序列化和反序列化。它與 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 繼承自 Hadoop API 的 SequenceFile,不過它的 key 為空,使用 value 存放實際的值,這樣是為了避免 MR 在運行 map 階段進行額外的排序操作。

  • RCFile: RCFile 文件格式是 FaceBook 開源的一種 Hive 的文件存儲格式,首先將表分為幾個行組,對每個行組內的數據按列存儲,每一列的數據都是分開存儲。

  • ORC Files: ORC 是在一定程度上擴展了 RCFile,是對 RCFile 的優化。

  • Avro Files: Avro 是一個數據序列化系統,設計用於支持大批量數據交換的應用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數據;動態語言友好,Avro 提供的機制使動態語言可以方便地處理 Avro 數據。

  • Parquet: Parquet 是基於 Dremel 的數據模型和算法實現的,面向分析型業務的列式存儲格式。它通過按列進行高效壓縮和特殊的編碼技術,從而在降低存儲空間的同時提高了 IO 效率。

常用操作命令#

常用 DDL 操作#

參考: LanguageManual DDL

查看數據列表: show databases;

使用數據庫:USE database_name;

新建數據庫:

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMA 是等價的
  [COMMENT database_comment] --數據庫註釋
  [LOCATION hdfs_path] --存儲在 HDFS 上的位置
  [WITH DBPROPERTIES (property_name=property_value, ...)]; --指定額外屬性

查看數據庫信息:
DESC DATABASE [EXTENDED] db_name; --EXTENDED 表示是否顯示額外屬性

刪除數據庫:

-- 默認行為是 RESTRICT,如果數據庫中存在表則刪除失敗。
-- 要想刪除庫及其中的表,可以使用 CASCADE 级联刪除
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];

創建表#

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --表名
  [(col_name data_type [COMMENT col_comment],
    ... [constraint_specification])]  --列名 列數據類型
  [COMMENT table_comment]   --表描述
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]  --分區表分區規則
  [
    CLUSTERED BY (col_name, col_name, ...)
   [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
  ]  --分桶表分桶規則
  [SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
   [STORED AS DIRECTORIES]
  ]  --指定傾斜列和值
  [
   [ROW FORMAT row_format]
   [STORED AS file_format]
     | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
  ]  -- 指定行分隔符、存儲文件格式或採用自定義存儲格式
  [LOCATION hdfs_path]  -- 指定表的存儲位置
  [TBLPROPERTIES (property_name=property_value, ...)]  --指定表的屬性
  [AS select_statement];   --從查詢結果創建表

支持從查詢語句的結果創建表:

CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

修改表#

重命名表:

ALTER TABLE table_name RENAME TO new_table_name;

修改列:

ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];

--示例
-- 修改字段名和類型
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- 修改字段 sal 的名稱 並將其放置到 empno 字段後
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2)  AFTER ename;

-- 為字段增加註釋
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';

新增列:

ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');

清空表 / 刪除表#

清空表:

-- 清空整個表或表指定分區中的數據
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value,  ...)];

目前只有內部表才能執行 TRUNCATE 操作,外部表執行時會拋出異常 Cannot truncate non-managed table XXXX

刪除表:

DROP TABLE [IF EXISTS] table_name [PURGE];
  • 內部表:不僅會刪除表的元數據,同時會刪除 HDFS 上的數據;
  • 外部表:只會刪除表的元數據,不會刪除 HDFS 上的數據;
  • 刪除視圖引用的表時,不會給出警告(但視圖已經無效了,必須由用戶刪除或重新創建)

其他#

查看視圖列表:

SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --僅支持 Hive 2.2.0 +

查看表的分區列表:

SHOW PARTITIONS table_name;

查看表 / 視圖的創建語句:

SHOW CREATE TABLE ([db_name.]table_name|view_name);

常用 DML 操作#

和關係型數據庫類似,具體參考: LanguageManual DML

排序關鍵字#

  • sort by :不是全局排序,其在數據進入 reducer 前完成排序

  • order by :會對輸入做全局排序,因此只有一個 reducer (多個 reducer 無法保證全局有序). 只有一個 reducer, 會導致當輸入規模較大時,需要較長的計算時間。

  • distribute by :按照指定的字段對數據進行劃分輸出到不同的 reduce 中

  • cluster by : 當 distribute by 和 sort by 的字段相同時,等同於 cluster by. 可以看做特殊的 distribute + sort

Hive 中追加導入數據方式#

  • 從本地導入: load data local inpath ‘/home/1.txt’ (overwrite)into table student;

  • 從 HDFS 導入: load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;

  • 查詢導入: create table student1 as select * from student;(也可以具體查詢某項數據)

  • 查詢結果導入:insert (overwrite)into table staff select * from track_log;

Hive 導出數據方式#

  • 用 insert overwrite 導出方式
    1. 導出到本地:

    insert overwrite local directory ‘/home/robot/1/2’ rom  format delimited fields terminated by ‘\t’ select * from staff;
    

    2. 導出到 HDFS

    insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
  • Bash shell 覆蓋追加導出

$ bin/hive -e “select * from staff; > /home/z/backup.log
  • sqoop 把 hive 數據導出到外部

拓展#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。