Yige

Yige

Build

Hive Overview

Overview of Hive#

Basic Architecture of Hive#

image.png

  • User Interface: CLI, JDBC/ODBC, Web UI layer

  • Thrift Server: Supports multiple language programs to manipulate Hive

  • Driver: Driver, Compiler, Optimizer, Executor

    The core of Hive is the driver engine, which consists of four parts:

    1. Interpreter: Converts Hive SQL statements into an abstract syntax tree (AST)
    2. Compiler: Converts the abstract syntax tree into a logical execution plan
    3. Optimizer: Optimizes the logical execution plan
    4. Executor: Calls the underlying execution framework to execute the logical plan
  • Metadata Storage System: The metadata in Hive typically includes the names of tables, columns, partitions, and their attributes, as well as the properties of the table (internal and external tables), and the directory where the table data is stored.

    The Metastore is by default in the built-in Derby database, which is not suitable for multi-user operations and has a non-fixed data storage directory. The database follows Hive, making it extremely inconvenient to manage.
    Solution: Typically, we use a MySQL database that we create ourselves (local or remote), and Hive interacts with MySQL through the MetaStore service.

Tables in Hive#

Tables in Hive correspond to specified directories on HDFS. When querying data, a full table scan is performed by default, which consumes a lot of time and performance.

Partitioned Tables and Bucketed Tables#

Partitioning#

Divides the data table into multiple partitions based on one or more columns. Partitions are subdirectories of the table directory on HDFS, and data is stored in subdirectories according to the partition. If the WHERE clause of the query contains partition conditions, it directly searches from that partition instead of scanning the entire table directory. A reasonable partition design can greatly improve query speed and performance.

Creating a Partitioned Table
In Hive, you can create a partitioned table using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE emp_partition(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2)
    )
    PARTITIONED BY (deptno INT)   -- Partitioned by department number
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_partition';

Loading Data into a Partitioned Table
When loading data into a partitioned table, you must specify the partition where the data is located.

# Load data for department number 20 into the table
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# Load data for department number 30 into the table
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

Bucketing#

Not all datasets can form reasonable partitions, and having too many partitions is not always better; excessive partition conditions may lead to many partitions without data. Bucketing is a finer-grained division relative to partitioning, distinguishing the entire data content based on the hash value of a certain column attribute.

Creating a Bucketed Table
In Hive, we can specify the bucketing column using CLUSTERED BY and specify the sorting reference column within the buckets using SORTED BY.

CREATE EXTERNAL TABLE emp_bucket(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2),
    deptno INT)
    CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS  -- Hashing employee numbers into four buckets
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_bucket';

Loading Data into a Bucketed Table

--1. Set to enforce bucketing, not needed in Hive 2.x
set hive.enforce.bucketing = true;

--2. Import data
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  -- Here, the emp table is a regular employee table

Internal Tables and External Tables#

Differences

  • When creating a table: When creating an internal table, the data is moved to the path pointed to by the data warehouse; when creating an external table, only the path of the data is recorded, and no changes are made to the data location.
  • When deleting a table: When deleting a table, both the metadata and data of the internal table are deleted, while the external table only deletes the metadata without deleting the data. This makes external tables relatively safer, with more flexible data organization, facilitating the sharing of source data.
  • When creating a table, an additional external keyword is added for external tables.

Usage Choices

  • If all data processing is done within Hive, it is preferable to choose internal tables.
  • The scenario for using external tables is mainly when sharing data sources; external tables can access the initial data stored on HDFS, and then use Hive to transform the data and store it in internal tables.

Custom UDF#

Reference link: Hive UDF

UDF (User Defined Function) mainly includes:

  • UDF (user-defined function): One input and one output, given one parameter, outputs processed data.
  • UDAF (user-defined aggregate function): Multiple inputs and one output, belongs to aggregate functions, similar to count, sum, etc.
  • UDTF (user-defined table function): One input and multiple outputs, belongs to one parameter, returns a list as a result.

Optimization#

(1) Data Skew#

Causes

  • Uneven key distribution
  • Characteristics of the business data itself
  • SQL statements causing data skew

Solutions

  • Set hive.map.aggr=true and hive.groupby.skewindata=true in Hive.

  • When there is data skew, perform load balancing. When setting hive.groupby.skewindata=true, the generated query plan will have two MR Jobs. In the first MR Job, the output results of the Map will be randomly distributed to the Reduce, with each Reduce performing partial aggregation operations and outputting results. This way, the results with the same Group By Key may be distributed to different Reduces, achieving load balancing; the second MR Job distributes the pre-processed data results according to Group By Key to the Reduce (this process ensures that the same Group By Key is distributed to the same Reduce), and finally completes the final aggregation operation.

  • SQL statement adjustments:

    1. Choose the table with the most uniform distribution of join keys as the driving table. Perform column pruning and filter operations to reduce the data volume during the join of the two tables.
    2. Join small tables: Use map join to load small dimension tables (with less than 1000 records) into memory first, completing the Reduce on the Map side.
    3. Join large tables: Change the null key values into a string plus a random number, distributing the skewed data to different reduces. Since null values cannot be associated, this does not affect the final result after processing.
    4. Count distinct large numbers of the same special value: When counting distinct values, handle cases where the value is null separately. If calculating count distinct, it can be filtered directly, adding 1 to the final result. If there are other calculations that require grouping, handle the records with null values separately first, then union them with the results of other calculations.

(2) General Settings#

  • hive.optimize.cp=true: Column pruning
  • hive.optimize.prunner: Partition pruning
  • hive.limit.optimize.enable=true: Optimize LIMIT n statements
  • hive.limit.row.max.size=1000000
  • hive.limit.optimize.limit.file=10: Maximum number of files

(3) Local Mode (Small Tasks)#

Enable local mode hive> set hive.exec.mode.local.auto=true

  • The input data size of the job must be less than the parameter: hive.exec.mode.local.auto.inputbytes.max (default 128MB)

  • The number of maps in the job must be less than the parameter: hive.exec.mode.local.auto.tasks.max (default 4)

  • The number of reduces in the job must be 0 or 1.

(4) Concurrent Execution#

Enable parallel computation hive> set hive.exec.parallel=true
Related parameter hive.exec.parallel.thread.number: The number of jobs allowed to run concurrently in one SQL computation.

(5) Strict Mode#

Mainly to prevent a group of SQL queries from greatly increasing the pressure on the cluster.

Enable strict mode: hive> set hive.mapred.mode = strict

Some Restrictions:

  • For partitioned tables, a WHERE clause must be added for filtering partition fields.
  • The ORDER BY statement must include a limit output restriction.
  • Limit execution of Cartesian product queries.

(6) Speculative Execution#

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;

(7) Grouping#

  • Two aggregate functions cannot have different DISTINCT columns; the following expression is incorrect:

    INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
    
  • The SELECT statement can only contain columns from GROUP BY or aggregate functions.

  • hive.multigroupby.singlemar=true: When multiple GROUP BY statements have the same grouping columns, they will be optimized into one MR task.

(8) Aggregation#

Enable map aggregation hive> set hive.map.aggr=true

Related Parameters

  • hive.groupby.mapaggr.checkinterval: The number of rows processed during map-side group by execution (default: 100000).

  • hive.map.aggr.hash.min.reduction: The minimum ratio for aggregation (if the amount of data after aggregation / 100000 is greater than this configuration of 0.5, it will not aggregate).

  • hive.map.aggr.hash.percentmemory: The maximum memory used for map-side aggregation.

  • hive.map.aggr.hash.force.flush.memory.threshold: The maximum available content of the hash table during map-side aggregation operations; exceeding this value will trigger a flush.

  • hive.groupby.skewindata: Whether to optimize data skew produced by Group By, default is false.

(9) Merging Small Files#

  • hive.merge.mapfiles=true: Merge map output.
  • hive.merge.mapredfiles=false: Merge reduce output.
  • hive.merge.size.per.task=256*1000*1000: Size of files to merge.
  • hive.mergejob.maponly=true: If CombineHiveInputFormat is supported, generate a task that only executes the merge on the Map side.
  • hive.merge.smallfiles.avgsize=16000000: When the average size of files is less than this value, an MR task will be started to execute the merge.

(10) Custom Map/Reduce Numbers#

Parameters related to Map count

  • mapred.max.split.size: The maximum size of a split, i.e., the maximum size of files processed by each map.

  • mapred.min.split.size.per.node: The minimum size of splits on a node.

  • mapred.min.split.size.per.rack: The minimum size of splits on a rack.

Parameters related to Reduce count

  • mapred.reduce.tasks: Force the specification of the number of reduce tasks.

  • hive.exec.reducers.bytes.per.reducer: The amount of data processed by each reduce task.

  • hive.exec.reducers.max: The maximum number of reduces per task [Map count >= Reduce count].

(11) Using Indexes:#

  • hive.optimize.index.filter: Automatically use indexes.
  • hive.optimize.index.groupby: Use aggregate indexes to optimize GROUP BY operations.

Supported Storage Formats#

ORC and Parquet have outstanding overall performance and are widely used; recommended for use.

  • TextFile: Stored as plain text files. This is the default file storage format in Hive. This storage method does not compress data, leading to high disk overhead and data parsing overhead.

  • SequenceFile: SequenceFile is a binary file provided by the Hadoop API, which serializes data in the form of <key,value> to the file. This binary file internally uses Hadoop's standard Writable interface for serialization and deserialization. It is compatible with MapFile in the Hadoop API. In Hive, SequenceFile inherits from Hadoop API's SequenceFile, but its key is empty, using value to store the actual value to avoid extra sorting operations during the map phase of MR.

  • RCFile: RCFile file format is an open-source Hive file storage format from Facebook, which first divides the table into several row groups and stores the data within each row group by column, with each column's data stored separately.

  • ORC Files: ORC is an extension of RCFile to some extent and is an optimization of RCFile.

  • Avro Files: Avro is a data serialization system designed to support applications that exchange large volumes of data. Its main features include: support for binary serialization, allowing for convenient and fast processing of large amounts of data; friendly to dynamic languages, with mechanisms provided by Avro that make it easy for dynamic languages to handle Avro data.

  • Parquet: Parquet is a columnar storage format aimed at analytical business, implemented based on the Dremel data model and algorithm. It achieves efficient compression by column and special encoding techniques, thereby reducing storage space while improving IO efficiency.

Common Operation Commands#

Common DDL Operations#

Reference: LanguageManual DDL

View Data List: show databases;

Use Database: USE database_name;

Create Database:

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   -- DATABASE|SCHEMA are equivalent
  [COMMENT database_comment] -- Database comment
  [LOCATION hdfs_path] -- Location stored on HDFS
  [WITH DBPROPERTIES (property_name=property_value, ...)]; -- Specify additional properties

View Database Information:
DESC DATABASE [EXTENDED] db_name; -- EXTENDED indicates whether to display additional properties

Delete Database:

-- The default behavior is RESTRICT; if there are tables in the database, deletion fails.
-- To delete the database and its tables, use CASCADE for cascading deletion.
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];

Create Table#

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     -- Table name
  [(col_name data_type [COMMENT col_comment],
    ... [constraint_specification])]  -- Column name, column data type
  [COMMENT table_comment]   -- Table description
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]  -- Partition rules for partitioned tables
  [
    CLUSTERED BY (col_name, col_name, ...)
   [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
  ]  -- Bucketing rules for bucketed tables
  [SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
   [STORED AS DIRECTORIES]
  ]  -- Specify skewed columns and values
  [
   [ROW FORMAT row_format]
   [STORED AS file_format]
     | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
  ]  -- Specify row delimiters, storage file formats, or use custom storage formats
  [LOCATION hdfs_path]  -- Specify the storage location of the table
  [TBLPROPERTIES (property_name=property_value, ...)]  -- Specify table properties
  [AS select_statement];   -- Create a table from query results

Support Creating Tables from Query Results:

CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

Modify Table#

Rename Table:

ALTER TABLE table_name RENAME TO new_table_name;

Modify Column:

ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];

-- Example
-- Modify field name and type
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- Modify the name of field sal and place it after empno field
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2)  AFTER ename;

-- Add a comment to the field
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';

Add Column:

ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');

Clear Table/Delete Table#

Clear Table:

-- Clear all data from the table or specified partitions in the table
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value,  ...)];

Currently, only internal tables can execute the TRUNCATE operation; executing it on external tables will throw an exception Cannot truncate non-managed table XXXX.

Delete Table:

DROP TABLE [IF EXISTS] table_name [PURGE];
  • Internal table: Will delete both the table's metadata and the data on HDFS.
  • External table: Will only delete the table's metadata, not the data on HDFS.
  • When deleting the table referenced by a view, no warning will be given (but the view is already invalid and must be deleted or recreated by the user).

Other#

View List of Views:

SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   -- Supported only in Hive 2.2.0 +

View List of Table Partitions:

SHOW PARTITIONS table_name;

View Create Statement of Table/View:

SHOW CREATE TABLE ([db_name.]table_name|view_name);

Common DML Operations#

Similar to relational databases, refer to: LanguageManual DML

Sorting Keywords#

  • sort by: Not a global sort; it completes sorting before data enters the reducer.

  • order by: Performs a global sort on the input, so there is only one reducer (multiple reducers cannot guarantee global order). Having only one reducer can lead to longer computation times when the input size is large.

  • distribute by: Divides the data based on specified fields and outputs to different reduces.

  • cluster by: When the fields of distribute by and sort by are the same, it is equivalent to cluster by. It can be seen as a special distribute + sort.

Methods for Appending Data in Hive#

  • Import from local: load data local inpath ‘/home/1.txt’ (overwrite)into table student;

  • Import from HDFS: load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;

  • Query import: create table student1 as select * from student; (can also query specific data)

  • Query result import: insert (overwrite)into table staff select * from track_log;

Methods for Exporting Data from Hive#

  • Use insert overwrite for export

    1. Export to local:
    insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
    1. Export to HDFS
    insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
  • Bash shell overwrite append export

$ bin/hive -e “select * from staff; > /home/z/backup.log
  • Use sqoop to export Hive data to external sources.

Expansion#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.