Yige

Yige

Build

Hiveの概要

Hive の概要#

Hive 基本アーキテクチャ#

image.png

  • ユーザーインターフェース: CLI、JDBC/ODBC、Web UI 層

  • Thriftサーバー: 複数のプログラミング言語で Hive を操作するためのサポート

  • ドライバー: ドライバー、コンパイラー、オプティマイザー、エグゼキューター

    Hive のコアはドライバーエンジンであり、ドライバーエンジンは 4 つの部分で構成されています:

    1. インタープリター: HiveSQL 文を抽象構文木 (AST) に変換
    2. コンパイラー:抽象構文木を論理実行計画に変換
    3. オプティマイザー:論理実行計画を最適化
    4. エグゼキューター:基盤となる実行フレームワークを呼び出して論理計画を実行
  • メタデータストレージシステム: Hive のメタデータには通常、テーブル名、テーブルの列とパーティションおよびその属性、テーブルの属性(内部テーブルと外部テーブル)、テーブルのデータが格納されているディレクトリが含まれます。

    Metastore はデフォルトで内蔵のDerbyデータベースに存在し、欠点は複数ユーザーの操作には適しておらず、データストレージディレクトリが固定されていないことです。データベースは Hive に従い、管理が非常に不便です。
    解決策: 通常は自分で作成した MySQL データベース(ローカルまたはリモート)を使用し、Hive と MySQL は MetaStore サービスを介して相互作用します。

Hive のテーブル#

Hive のテーブルは HDFS 上の指定されたディレクトリに対応し、データをクエリする際にはデフォルトで全テーブルをスキャンします。これにより時間とパフォーマンスの消耗が非常に大きくなります。

パーティションテーブルとバケットテーブル#

パーティション#

データテーブルの特定の列または複数の列に基づいて複数の区に分割され、パーティションは HDFS 上のテーブルディレクトリのサブディレクトリです。データはパーティションに従ってサブディレクトリに格納されます。クエリの where 句にパーティション条件が含まれている場合、そのパーティションから直接検索され、全テーブルディレクトリをスキャンすることはありません。合理的なパーティション設計はクエリ速度とパフォーマンスを大幅に向上させることができます。

パーティションテーブルの作成
Hive ではPARTITIONED BY句を使用してパーティションテーブルを作成できます。

CREATE EXTERNAL TABLE emp_partition(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2)
    )
    PARTITIONED BY (deptno INT)   -- 部門番号に基づいてパーティションを作成
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_partition';

パーティションテーブルへのデータのロード
パーティションテーブルにデータをロードする際は、データが存在するパーティションを指定する必要があります。

# 部門番号20のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 部門番号30のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

バケット#

すべてのデータセットが合理的なパーティションを形成できるわけではなく、パーティションの数が多ければ良いというわけでもありません。過剰なパーティション条件は、多くのパーティションにデータがないことを引き起こす可能性があります。バケットはパーティションに対してより細かい粒度の分割を行い、全データ内容を特定の列の属性値のハッシュ値に基づいて区別します

バケットテーブルの作成
Hive では、CLUSTERED BYを使用してバケット列を指定し、SORTED BYを使用してバケット内のデータのソート参照列を指定できます。

CREATE EXTERNAL TABLE emp_bucket(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2),
    deptno INT)
    CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS  -- 従業員番号に基づいて4つのバケットにハッシュ化
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_bucket';

バケットテーブルへのデータのロード

-- 1. 強制バケット設定、Hive 2.xではこのステップは不要
set hive.enforce.bucketing = true;

-- 2. データをインポート
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  -- ここでのempテーブルは通常の従業員テーブルです

内部テーブルと外部テーブル#

違い

  • テーブル作成時:内部テーブルを作成すると、データはデータウェアハウスが指し示すパスに移動されます。外部テーブルを作成する場合は、データの存在するパスを記録するだけで、データの位置は変更されません。
  • テーブル削除時:テーブルを削除すると、内部テーブルのメタデータとデータは一緒に削除されますが、外部テーブルはメタデータのみが削除され、データは削除されません。このため、外部テーブルは相対的に安全であり、データの組織も柔軟で、ソースデータの共有が容易です。
  • テーブル作成時に外部テーブルにはexternalキーワードを追加します。

使用選択

  • データのすべての処理が Hive 内で行われる場合、内部テーブルを選択する傾向があります。
  • 外部テーブルの使用シーンは主にデータソースの共有の状況であり、外部テーブルを使用して HDFS 上に保存された初期データにアクセスし、その後 Hive でデータを変換して内部テーブルに保存することができます。

カスタム UDF#

参考リンク: Hive UDF

UDF(ユーザー定義関数)は、主に以下を含みます:

  • UDF(ユーザー定義関数): 一進一出、与えられたパラメータに基づいて処理されたデータを出力します。
  • UDAF(ユーザー定義集約関数): 多進一出、集約関数に属し、count、sum などの関数に似ています。
  • UDTF(ユーザー定義テーブル関数): 一進多出、一つのパラメータに属し、結果としてリストを返します。

最適化#

(1) データの偏り#

原因

  • キーの分布が不均一
  • ビジネスデータ自体の特性
  • SQL 文によるデータの偏り

解決方法

  • hive でhive.map.aggr=truehive.groupby.skewindata=trueを設定します。

  • データの偏りがある場合は負荷分散を行い、hive.groupby.skewindata=trueを設定すると、生成されるクエリ計画には 2 つの MR ジョブが含まれます。最初の MR ジョブでは、Map の出力結果集合が Reduce にランダムに分配され、各 Reduce が部分的な集約操作を行い、結果を出力します。このように処理された結果は、同じ Group By Key が異なる Reduce に分配される可能性があるため、負荷分散の目的を達成します。2 つ目の MR ジョブは、事前処理されたデータ結果に基づいて Group By Key に従って Reduce に分配され(このプロセスでは同じ Group By Key が同じ Reduce に分配されることが保証されます)、最終的な集約操作を完了します。

  • SQL 文の調整:

    1. 分布が最も均一なテーブルをドライバーテーブルとして選択する。列の裁断とフィルター操作を行い、2 つのテーブルを結合する際にデータ量を相対的に減少させる効果を得ます。
    2. 小さなテーブルと大きなテーブルの結合:小さなディメンションテーブル(1000 行未満のレコード数)をメモリに先に読み込み、Map 側で Reduce を完了させるために map join を使用します。
    3. 大きなテーブルと大きなテーブルの結合:空のキーを文字列とランダムな数値に変換し、偏ったデータを異なる Reduce に分配します。null 値は結びつかないため、処理後に最終結果に影響を与えません。
    4. count distinctの大量の同じ特殊値:count distinct 時に、値が空のケースを別途処理します。count distinct を計算する場合は処理せず、直接フィルタリングし、後の結果に 1 を加えます。他の計算が必要な場合は group by を行い、値が空のレコードを別途処理してから他の計算結果と union します。

(2) 一般設定#

  • hive.optimize.cp=true:列裁断
  • hive.optimize.prunner:パーティション裁断
  • hive.limit.optimize.enable=true:LIMIT n 文の最適化
  • hive.limit.row.max.size=1000000:
  • hive.limit.optimize.limit.file=10:最大ファイル数

(3) ローカルモード(小タスク)#

ローカルモードを有効にする hive> set hive.exec.mode.local.auto=true

  • ジョブの入力データサイズはパラメータhive.exec.mode.local.auto.inputbytes.max(デフォルト128MB)より小さくなければなりません。

  • ジョブの map 数はパラメータhive.exec.mode.local.auto.tasks.max(デフォルト4)より小さくなければなりません。

  • ジョブの reduce 数は 0 または 1 でなければなりません。

(4) 同時実行#

並列計算を有効にする hive> set hive.exec.parallel=true
関連パラメータ hive.exec.parallel.thread.number:一度の SQL 計算で許可されるジョブの数

(5) 厳密モード#

主に一群の SQL クエリがクラスターの負荷を大幅に増加させるのを防ぐためです。

厳密モードを有効にする: hive> set hive.mapred.mode = strict

いくつかの制限:

  • パーティションテーブルの場合、パーティションフィールドに対する条件フィルタを追加する必要があります。
  • orderby 文には出力制限を含む limit が必要です。
  • デカルト積のクエリの実行を制限します。

(6) 推測実行#

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;

(7) グループ化#

  • 2 つの集約関数は異なる DISTINCT 列を持つことはできません。以下の式は誤りです:

    INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
    
  • SELECT 文には GROUP BY の列または集約関数のみが含まれるべきです。

  • hive.multigroupby.singlemar=true:複数の GROUP BY 文が同じグループ列を持つ場合、それは 1 つの MR タスクに最適化されます。

(8) 集約#

map 集約を有効にする hive> set hive.map.aggr=true

関連パラメータ

  • hive.groupby.mapaggr.checkinterval:map 側で group by を実行する際に処理する行数(デフォルト:100000)

  • hive.map.aggr.hash.min.reduction:集約の最小比率(事前に 100000 行のデータを集約し、集約後のデータ量 / 100000 の値がこの設定 0.5 を超えない場合、集約は行われません)

  • hive.map.aggr.hash.percentmemory:map 側集約に使用されるメモリの最大値

  • hive.map.aggr.hash.force.flush.memory.threshold:map 側で集約操作を行う際のハッシュテーブルの最大可用内容。この値を超えるとフラッシュがトリガーされます。

  • hive.groupby.skewindata :GroupBy によって生成されたデータの偏りを最適化するかどうか、デフォルトは false です。

(9) 小ファイルの結合#

  • hive.merg.mapfiles=true:map 出力を結合
  • hive.merge.mapredfiles=false:reduce 出力を結合
  • hive.merge.size.per.task=256*1000*1000:結合ファイルのサイズ
  • hive.mergejob.maponly=true:CombineHiveInputFormat をサポートする場合、Map のみのタスクを生成して結合を実行します。
  • hive.merge.smallfiles.avgsize=16000000:ファイルの平均サイズがこの値未満の場合、MR タスクが結合を実行します。

(10) カスタム map/reduce 数#

Map 数に関連するパラメータ

  • mapred.max.split.size:1 つの split の最大値、つまり各 map が処理するファイルの最大値

  • mapred.min.split.size.per.node:1 つのノード上の split の最小値

  • mapred.min.split.size.per.rack:1 つのラック上の split の最小値

Reduce 数に関連するパラメータ

  • mapred.reduce.tasks:reduce タスクの数を強制的に指定します。

  • hive.exec.reducers.bytes.per.reducer 各 reduce タスクが処理するデータ量

  • hive.exec.reducers.max 各タスクの最大 reduce 数 [Map 数>= Reduce 数 ]

(11) インデックスの使用:#

  • hive.optimize.index.filter:インデックスを自動的に使用
  • hive.optimize.index.groupby:集約インデックスを使用して GROUP BY 操作を最適化

サポートされているストレージフォーマット#

ORC と Parquet の総合的な性能は突出しており、広く使用されているため、使用を推奨します。

  • TextFile プレーンテキストファイルとして保存。 これは Hive のデフォルトのファイルストレージフォーマットです。このストレージ方式ではデータは圧縮されず、ディスクのオーバーヘッドが大きく、データ解析のオーバーヘッドも大きいです。

  • SequenceFile: SequenceFile は Hadoop API が提供するバイナリファイルで、データを <key,value> の形式でファイルにシリアライズします。このバイナリファイルは内部で Hadoop の標準 Writable インターフェースを使用してシリアライズとデシリアライズを実現しています。Hadoop API の MapFile と互換性があります。Hive の SequenceFile は Hadoop API の SequenceFile を継承していますが、キーは空で、実際の値を格納するために value を使用します。これは MR が map 段階で追加のソート操作を行うのを避けるためです。

  • RCFile: RCFile ファイルフォーマットは FaceBook がオープンソースした Hive のファイルストレージフォーマットで、まずテーブルをいくつかの行グループに分割し、各行グループ内のデータを列ごとに保存します。各列のデータは別々に保存されます。

  • ORC Files: ORC は RCFile をある程度拡張したもので、RCFile の最適化です。

  • Avro Files: Avro はデータシリアライズシステムで、大量データ交換アプリケーションをサポートするために設計されています。主な特徴は、バイナリシリアライズ方式をサポートし、大量データを迅速に処理できることです。動的言語に優しい Avro は、動的言語が Avro データを簡単に処理できるメカニズムを提供します。

  • Parquet: Parquet は Dremel に基づくデータモデルとアルゴリズムを実装した、分析型ビジネス向けの列指向ストレージフォーマットです。列ごとに効率的に圧縮し、特別なエンコーディング技術を使用することで、ストレージスペースを削減しながら IO 効率を向上させます。

よく使う操作コマンド#

よく使う DDL 操作#

参考: LanguageManual DDL

データリストの表示: show databases;

データベースの使用:USE database_name;

新しいデータベースの作成:

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMAは同等です
  [COMMENT database_comment] --データベースのコメント
  [LOCATION hdfs_path] --HDFS上の保存場所
  [WITH DBPROPERTIES (property_name=property_value, ...)]; --追加の属性を指定

データベース情報の表示:
DESC DATABASE [EXTENDED] db_name; --EXTENDEDは追加属性を表示するかどうかを示します

データベースの削除:

-- デフォルトの動作はRESTRICTであり、データベース内にテーブルが存在する場合は削除に失敗します。
-- データベースとその中のテーブルを削除するには、CASCADEを使用して削除できます。
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];

テーブルの作成#

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --テーブル名
  [(col_name data_type [COMMENT col_comment],
    ... [constraint_specification])]  --列名 列データ型
  [COMMENT table_comment]   --テーブルの説明
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]  --パーティションテーブルのパーティションルール
  [
    CLUSTERED BY (col_name, col_name, ...)
   [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
  ]  --バケットテーブルのバケットルール
  [SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
   [STORED AS DIRECTORIES]
  ]  --傾斜列と値を指定
  [
   [ROW FORMAT row_format]
   [STORED AS file_format]
     | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
  ]  -- 行区切り文字、ストレージファイルフォーマット、またはカスタムストレージフォーマットを指定
  [LOCATION hdfs_path]  -- テーブルの保存場所を指定
  [TBLPROPERTIES (property_name=property_value, ...)]  --テーブルの属性を指定
  [AS select_statement];   --クエリ結果からテーブルを作成

クエリ文の結果からテーブルを作成することをサポート:

CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

テーブルの変更#

テーブルの名前を変更:

ALTER TABLE table_name RENAME TO new_table_name;

列の変更:

ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];

--例
-- フィールド名と型を変更
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- フィールドsalの名前を変更し、empnoフィールドの後に配置
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2)  AFTER ename;

-- フィールドにコメントを追加
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'これはmgr列です';

列の追加:

ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT '自宅の住所');

テーブルのクリア / 削除#

テーブルのクリア:

-- テーブル全体または指定されたパーティション内のデータをクリア
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value,  ...)];

現在、TRUNCATE操作は内部テーブルのみで実行可能であり、外部テーブルで実行すると例外Cannot truncate non-managed table XXXXが発生します。

テーブルの削除:

DROP TABLE [IF EXISTS] table_name [PURGE];
  • 内部テーブル:テーブルのメタデータだけでなく、HDFS 上のデータも削除されます。
  • 外部テーブル:テーブルのメタデータのみが削除され、HDFS 上のデータは削除されません。
  • ビューが参照しているテーブルを削除する際、警告は表示されません(ただし、ビューは無効になっており、ユーザーが削除または再作成する必要があります)。

その他#

ビューリストの表示:

SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --Hive 2.2.0以降のみサポート

テーブルのパーティションリストの表示:

SHOW PARTITIONS table_name;

テーブル / ビューの作成文の表示:

SHOW CREATE TABLE ([db_name.]table_name|view_name);

よく使う DML 操作#

リレーショナルデータベースに似ており、具体的には参考: LanguageManual DML

ソートキーワード#

  • sort by :全体のソートではなく、データが reducer に入る前にソートが完了します。

  • order by :入力に対して全体のソートを行うため、1 つの reducer のみ(複数の reducer では全体の順序を保証できません)。1 つの reducer のみであるため、入力規模が大きい場合、計算時間が長くなる可能性があります。

  • distribute by :指定されたフィールドに基づいてデータを異なる reduce に分配します。

  • cluster by :distribute by と sort by のフィールドが同じ場合、cluster by と等しいと見なされます。特別な distribute + sort と考えることができます。

Hive でのデータの追加インポート方法#

  • ローカルからインポート: load data local inpath ‘/home/1.txt’ (overwrite)into table student;

  • HDFS からインポート: load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;

  • クエリインポート: create table student1 as select * from student;(特定のデータをクエリすることもできます)

  • クエリ結果のインポート:insert (overwrite)into table staff select * from track_log;

Hive でのデータのエクスポート方法#

  • insert overwrite エクスポート方法

    1. ローカルにエクスポート:
    insert overwrite local directory ‘/home/robot/1/2’ rom  format delimited fields terminated by ‘\t’ select * from staff;
    
    1. HDFS にエクスポート
    insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
  • Bash シェルでの上書き追加エクスポート

$ bin/hive -e “select * from staff; > /home/z/backup.log
  • sqoop を使用して Hive データを外部にエクスポート

拡張#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。