Hive の概要#
Hive 基本アーキテクチャ#
-
ユーザーインターフェース
: CLI、JDBC/ODBC、Web UI 層 -
Thriftサーバー
: 複数のプログラミング言語で Hive を操作するためのサポート -
ドライバー
: ドライバー、コンパイラー、オプティマイザー、エグゼキューターHive のコアはドライバーエンジンであり、ドライバーエンジンは 4 つの部分で構成されています:
- インタープリター: HiveSQL 文を抽象構文木 (AST) に変換
- コンパイラー:抽象構文木を論理実行計画に変換
- オプティマイザー:論理実行計画を最適化
- エグゼキューター:基盤となる実行フレームワークを呼び出して論理計画を実行
-
メタデータストレージシステム
: Hive のメタデータには通常、テーブル名、テーブルの列とパーティションおよびその属性、テーブルの属性(内部テーブルと外部テーブル)、テーブルのデータが格納されているディレクトリが含まれます。Metastore はデフォルトで内蔵の
Derby
データベースに存在し、欠点は複数ユーザーの操作には適しておらず、データストレージディレクトリが固定されていないことです。データベースは Hive に従い、管理が非常に不便です。
解決策: 通常は自分で作成した MySQL データベース(ローカルまたはリモート)を使用し、Hive と MySQL は MetaStore サービスを介して相互作用します。
Hive のテーブル#
Hive のテーブルは HDFS 上の指定されたディレクトリに対応し、データをクエリする際にはデフォルトで全テーブルをスキャンします。これにより時間とパフォーマンスの消耗が非常に大きくなります。
パーティションテーブルとバケットテーブル#
パーティション#
データテーブルの特定の列または複数の列に基づいて複数の区に分割され、パーティションは HDFS 上のテーブルディレクトリのサブディレクトリです。データはパーティションに従ってサブディレクトリに格納されます。クエリの where 句にパーティション条件が含まれている場合、そのパーティションから直接検索され、全テーブルディレクトリをスキャンすることはありません。合理的なパーティション設計はクエリ速度とパフォーマンスを大幅に向上させることができます。
パーティションテーブルの作成
Hive ではPARTITIONED BY
句を使用してパーティションテーブルを作成できます。
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 部門番号に基づいてパーティションを作成
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';
パーティションテーブルへのデータのロード
パーティションテーブルにデータをロードする際は、データが存在するパーティションを指定する必要があります。
# 部門番号20のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 部門番号30のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)
バケット#
すべてのデータセットが合理的なパーティションを形成できるわけではなく、パーティションの数が多ければ良いというわけでもありません。過剰なパーティション条件は、多くのパーティションにデータがないことを引き起こす可能性があります。バケットはパーティションに対してより細かい粒度の分割を行い、全データ内容を特定の列の属性値のハッシュ値に基づいて区別します。
バケットテーブルの作成
Hive では、CLUSTERED BY
を使用してバケット列を指定し、SORTED BY
を使用してバケット内のデータのソート参照列を指定できます。
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS -- 従業員番号に基づいて4つのバケットにハッシュ化
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';
バケットテーブルへのデータのロード
-- 1. 強制バケット設定、Hive 2.xではこのステップは不要
set hive.enforce.bucketing = true;
-- 2. データをインポート
INSERT INTO TABLE emp_bucket SELECT * FROM emp; -- ここでのempテーブルは通常の従業員テーブルです
内部テーブルと外部テーブル#
違い
- テーブル作成時:内部テーブルを作成すると、データはデータウェアハウスが指し示すパスに移動されます。外部テーブルを作成する場合は、データの存在するパスを記録するだけで、データの位置は変更されません。
- テーブル削除時:テーブルを削除すると、内部テーブルのメタデータとデータは一緒に削除されますが、外部テーブルはメタデータのみが削除され、データは削除されません。このため、外部テーブルは相対的に安全であり、データの組織も柔軟で、ソースデータの共有が容易です。
- テーブル作成時に外部テーブルには
external
キーワードを追加します。
使用選択
- データのすべての処理が Hive 内で行われる場合、内部テーブルを選択する傾向があります。
- 外部テーブルの使用シーンは主にデータソースの共有の状況であり、外部テーブルを使用して HDFS 上に保存された初期データにアクセスし、その後 Hive でデータを変換して内部テーブルに保存することができます。
カスタム UDF#
参考リンク: Hive UDF
UDF(ユーザー定義関数)は、主に以下を含みます:
UDF(ユーザー定義関数)
: 一進一出、与えられたパラメータに基づいて処理されたデータを出力します。UDAF(ユーザー定義集約関数)
: 多進一出、集約関数に属し、count、sum などの関数に似ています。UDTF(ユーザー定義テーブル関数)
: 一進多出、一つのパラメータに属し、結果としてリストを返します。
最適化#
(1) データの偏り#
原因
- キーの分布が不均一
- ビジネスデータ自体の特性
- SQL 文によるデータの偏り
解決方法
-
hive で
hive.map.aggr=true
とhive.groupby.skewindata=true
を設定します。 -
データの偏りがある場合は負荷分散を行い、
hive.groupby.skewindata=true
を設定すると、生成されるクエリ計画には 2 つの MR ジョブが含まれます。最初の MR ジョブでは、Map の出力結果集合が Reduce にランダムに分配され、各 Reduce が部分的な集約操作を行い、結果を出力します。このように処理された結果は、同じ Group By Key が異なる Reduce に分配される可能性があるため、負荷分散の目的を達成します。2 つ目の MR ジョブは、事前処理されたデータ結果に基づいて Group By Key に従って Reduce に分配され(このプロセスでは同じ Group By Key が同じ Reduce に分配されることが保証されます)、最終的な集約操作を完了します。 -
SQL 文の調整:
分布が最も均一なテーブルをドライバーテーブルとして選択する
。列の裁断とフィルター操作を行い、2 つのテーブルを結合する際にデータ量を相対的に減少させる効果を得ます。小さなテーブルと大きなテーブルの結合
:小さなディメンションテーブル(1000 行未満のレコード数)をメモリに先に読み込み、Map 側で Reduce を完了させるために map join を使用します。大きなテーブルと大きなテーブルの結合
:空のキーを文字列とランダムな数値に変換し、偏ったデータを異なる Reduce に分配します。null 値は結びつかないため、処理後に最終結果に影響を与えません。count distinctの大量の同じ特殊値
:count distinct 時に、値が空のケースを別途処理します。count distinct を計算する場合は処理せず、直接フィルタリングし、後の結果に 1 を加えます。他の計算が必要な場合は group by を行い、値が空のレコードを別途処理してから他の計算結果と union します。
(2) 一般設定#
hive.optimize.cp=true
:列裁断hive.optimize.prunner
:パーティション裁断hive.limit.optimize.enable=true
:LIMIT n 文の最適化hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大ファイル数
(3) ローカルモード(小タスク)#
ローカルモードを有効にする hive> set hive.exec.mode.local.auto=true
-
ジョブの入力データサイズはパラメータ
hive.exec.mode.local.auto.inputbytes.max(デフォルト128MB)
より小さくなければなりません。 -
ジョブの map 数はパラメータ
hive.exec.mode.local.auto.tasks.max(デフォルト4)
より小さくなければなりません。 -
ジョブの reduce 数は 0 または 1 でなければなりません。
(4) 同時実行#
並列計算を有効にする hive> set hive.exec.parallel=true
関連パラメータ hive.exec.parallel.thread.number
:一度の SQL 計算で許可されるジョブの数
(5) 厳密モード#
主に一群の SQL クエリがクラスターの負荷を大幅に増加させるのを防ぐためです。
厳密モードを有効にする: hive> set hive.mapred.mode = strict
いくつかの制限:
- パーティションテーブルの場合、パーティションフィールドに対する条件フィルタを追加する必要があります。
- orderby 文には出力制限を含む limit が必要です。
- デカルト積のクエリの実行を制限します。
(6) 推測実行#
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;
(7) グループ化#
-
2 つの集約関数は異なる DISTINCT 列を持つことはできません。以下の式は誤りです:
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
-
SELECT 文には GROUP BY の列または集約関数のみが含まれるべきです。
-
hive.multigroupby.singlemar=true
:複数の GROUP BY 文が同じグループ列を持つ場合、それは 1 つの MR タスクに最適化されます。
(8) 集約#
map 集約を有効にする hive> set hive.map.aggr=true
関連パラメータ
-
hive.groupby.mapaggr.checkinterval
:map 側で group by を実行する際に処理する行数(デフォルト:100000) -
hive.map.aggr.hash.min.reduction
:集約の最小比率(事前に 100000 行のデータを集約し、集約後のデータ量 / 100000 の値がこの設定 0.5 を超えない場合、集約は行われません) -
hive.map.aggr.hash.percentmemory
:map 側集約に使用されるメモリの最大値 -
hive.map.aggr.hash.force.flush.memory.threshold
:map 側で集約操作を行う際のハッシュテーブルの最大可用内容。この値を超えるとフラッシュがトリガーされます。 -
hive.groupby.skewindata
:GroupBy によって生成されたデータの偏りを最適化するかどうか、デフォルトは false です。
(9) 小ファイルの結合#
hive.merg.mapfiles=true
:map 出力を結合hive.merge.mapredfiles=false
:reduce 出力を結合hive.merge.size.per.task=256*1000*1000
:結合ファイルのサイズhive.mergejob.maponly=true
:CombineHiveInputFormat をサポートする場合、Map のみのタスクを生成して結合を実行します。hive.merge.smallfiles.avgsize=16000000
:ファイルの平均サイズがこの値未満の場合、MR タスクが結合を実行します。
(10) カスタム map/reduce 数#
Map 数に関連するパラメータ
-
mapred.max.split.size
:1 つの split の最大値、つまり各 map が処理するファイルの最大値 -
mapred.min.split.size.per.node
:1 つのノード上の split の最小値 -
mapred.min.split.size.per.rack
:1 つのラック上の split の最小値
Reduce 数に関連するパラメータ
-
mapred.reduce.tasks
:reduce タスクの数を強制的に指定します。 -
hive.exec.reducers.bytes.per.reducer
各 reduce タスクが処理するデータ量 -
hive.exec.reducers.max
各タスクの最大 reduce 数 [Map 数>= Reduce 数 ]
(11) インデックスの使用:#
hive.optimize.index.filter
:インデックスを自動的に使用hive.optimize.index.groupby
:集約インデックスを使用して GROUP BY 操作を最適化
サポートされているストレージフォーマット#
ORC と Parquet の総合的な性能は突出しており、広く使用されているため、使用を推奨します。
-
TextFile プレーンテキストファイルとして保存
。 これは Hive のデフォルトのファイルストレージフォーマットです。このストレージ方式ではデータは圧縮されず、ディスクのオーバーヘッドが大きく、データ解析のオーバーヘッドも大きいです。 -
SequenceFile
: SequenceFile は Hadoop API が提供するバイナリファイルで、データを <key,value> の形式でファイルにシリアライズします。このバイナリファイルは内部で Hadoop の標準 Writable インターフェースを使用してシリアライズとデシリアライズを実現しています。Hadoop API の MapFile と互換性があります。Hive の SequenceFile は Hadoop API の SequenceFile を継承していますが、キーは空で、実際の値を格納するために value を使用します。これは MR が map 段階で追加のソート操作を行うのを避けるためです。 -
RCFile
: RCFile ファイルフォーマットは FaceBook がオープンソースした Hive のファイルストレージフォーマットで、まずテーブルをいくつかの行グループに分割し、各行グループ内のデータを列ごとに保存します。各列のデータは別々に保存されます。 -
ORC Files
: ORC は RCFile をある程度拡張したもので、RCFile の最適化です。 -
Avro Files
: Avro はデータシリアライズシステムで、大量データ交換アプリケーションをサポートするために設計されています。主な特徴は、バイナリシリアライズ方式をサポートし、大量データを迅速に処理できることです。動的言語に優しい Avro は、動的言語が Avro データを簡単に処理できるメカニズムを提供します。 -
Parquet
: Parquet は Dremel に基づくデータモデルとアルゴリズムを実装した、分析型ビジネス向けの列指向ストレージフォーマットです。列ごとに効率的に圧縮し、特別なエンコーディング技術を使用することで、ストレージスペースを削減しながら IO 効率を向上させます。
よく使う操作コマンド#
よく使う DDL 操作#
データリストの表示: show databases;
データベースの使用:USE database_name;
新しいデータベースの作成:
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name --DATABASE|SCHEMAは同等です
[COMMENT database_comment] --データベースのコメント
[LOCATION hdfs_path] --HDFS上の保存場所
[WITH DBPROPERTIES (property_name=property_value, ...)]; --追加の属性を指定
データベース情報の表示:
DESC DATABASE [EXTENDED] db_name; --EXTENDEDは追加属性を表示するかどうかを示します
データベースの削除:
-- デフォルトの動作はRESTRICTであり、データベース内にテーブルが存在する場合は削除に失敗します。
-- データベースとその中のテーブルを削除するには、CASCADEを使用して削除できます。
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];
テーブルの作成#
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name --テーブル名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列データ型
[COMMENT table_comment] --テーブルの説明
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --パーティションテーブルのパーティションルール
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --バケットテーブルのバケットルール
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --傾斜列と値を指定
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 行区切り文字、ストレージファイルフォーマット、またはカスタムストレージフォーマットを指定
[LOCATION hdfs_path] -- テーブルの保存場所を指定
[TBLPROPERTIES (property_name=property_value, ...)] --テーブルの属性を指定
[AS select_statement]; --クエリ結果からテーブルを作成
クエリ文の結果からテーブルを作成することをサポート:
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';
テーブルの変更#
テーブルの名前を変更:
ALTER TABLE table_name RENAME TO new_table_name;
列の変更:
ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];
--例
-- フィールド名と型を変更
ALTER TABLE emp_temp CHANGE empno empno_new INT;
-- フィールドsalの名前を変更し、empnoフィールドの後に配置
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;
-- フィールドにコメントを追加
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'これはmgr列です';
列の追加:
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT '自宅の住所');
テーブルのクリア / 削除#
テーブルのクリア:
-- テーブル全体または指定されたパーティション内のデータをクリア
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
現在、TRUNCATE
操作は内部テーブルのみで実行可能であり、外部テーブルで実行すると例外Cannot truncate non-managed table XXXX
が発生します。
テーブルの削除:
DROP TABLE [IF EXISTS] table_name [PURGE];
- 内部テーブル:テーブルのメタデータだけでなく、HDFS 上のデータも削除されます。
- 外部テーブル:テーブルのメタデータのみが削除され、HDFS 上のデータは削除されません。
- ビューが参照しているテーブルを削除する際、警告は表示されません(ただし、ビューは無効になっており、ユーザーが削除または再作成する必要があります)。
その他#
ビューリストの表示:
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards']; --Hive 2.2.0以降のみサポート
テーブルのパーティションリストの表示:
SHOW PARTITIONS table_name;
テーブル / ビューの作成文の表示:
SHOW CREATE TABLE ([db_name.]table_name|view_name);
よく使う DML 操作#
リレーショナルデータベースに似ており、具体的には参考: LanguageManual DML
ソートキーワード#
-
sort by
:全体のソートではなく、データが reducer に入る前にソートが完了します。 -
order by
:入力に対して全体のソートを行うため、1 つの reducer のみ(複数の reducer では全体の順序を保証できません)。1 つの reducer のみであるため、入力規模が大きい場合、計算時間が長くなる可能性があります。 -
distribute by
:指定されたフィールドに基づいてデータを異なる reduce に分配します。 -
cluster by
:distribute by と sort by のフィールドが同じ場合、cluster by と等しいと見なされます。特別な distribute + sort と考えることができます。
Hive でのデータの追加インポート方法#
-
ローカルからインポート:
load data local inpath ‘/home/1.txt’ (overwrite)into table student;
-
HDFS からインポート:
load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;
-
クエリインポート:
create table student1 as select * from student;(特定のデータをクエリすることもできます)
-
クエリ結果のインポート:
insert (overwrite)into table staff select * from track_log;
Hive でのデータのエクスポート方法#
-
insert overwrite エクスポート方法
- ローカルにエクスポート:
insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
- HDFS にエクスポート
insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
-
Bash シェルでの上書き追加エクスポート
$ bin/hive -e “select * from staff;” > /home/z/backup.log
- sqoop を使用して Hive データを外部にエクスポート