Yige

Yige

Build

Data Warehouse Series - Overview of Kylin

Data Warehouse Series - Overview of Kylin#

Features of Kylin#

The main features of Kylin include support for SQL interfaces, support for extremely large datasets, sub-second response times, scalability, high throughput, and integration with BI tools.

  1. Standard SQL Interface: Kylin serves as an interface using standard SQL.
  2. Support for Extremely Large Datasets: Kylin's capability to support big data is among the most advanced in all technologies. As early as 2015, it could support queries on billions of records in eBay's production environment, and later there were cases of querying hundreds of billions of records in mobile application scenarios.
  3. Sub-Second Response: Kylin has excellent query response speed, thanks to pre-computation. Many complex calculations, such as joins and aggregations, are completed during the offline pre-computation process, significantly reducing the computational load required at query time and improving response speed.
  4. Scalability and High Throughput: A single-node Kylin can achieve 70 queries per second and can also set up a Kylin cluster.
  5. BI Tool Integration: Kylin can integrate with existing BI tools, specifically including the following:
    1. ODBC: Integration with tools like Tableau, Excel, PowerBI
    2. JDBC: Integration with Java tools like Saiku, BIRT
    3. RestAPI: Integration with JavaScript and web pages

Basic Architecture of Kylin#

image.png

REST Server
The REST Server is a set of entry points for application development aimed at facilitating application development for the Kylin platform. Such applications can provide querying, retrieving results, triggering cube build tasks, obtaining metadata, and acquiring user permissions, among other functionalities. SQL queries can also be executed through the Restful interface.

Query Engine
Once the cube is ready, the query engine can retrieve and parse user queries. It then interacts with other components in the system to return the corresponding results to the user.

Routing
Responsible for converting the execution plan generated from the parsed SQL into queries cached in the cube. The cube is cached in HBase through pre-computation, and this part of the query can be completed in milliseconds at the second level.

Metadata Management Tool
Kylin is a metadata-driven application. The metadata management tool is a key component used to manage all metadata stored in Kylin, including the most important cube metadata. The normal operation of all other components relies on the metadata management tool. Kylin's metadata is stored in HBase.

Task Engine (Cube Build Engine)
This engine is designed to handle all offline tasks, including shell scripts, Java APIs, and MapReduce tasks. The task engine manages and coordinates all tasks in Kylin to ensure that each task is executed effectively and to resolve any issues that arise.

Working Principle of Kylin#

The working principle of Apache Kylin is essentially MOLAP (Multidimensional On-Line Analysis Processing) Cube, which refers to multidimensional cube analysis. For detailed concepts, refer to: Data Warehouse Series - Basic Concept Organization

Core Algorithm#

Pre-compute the cube for the data model and use the results to accelerate queries:

  • Specify the data model, define dimensions and measures;

  • Pre-compute the cube, calculate all cuboids, and save them as materialized views;
    The pre-computation process involves Kylin reading raw data from Hive, calculating based on the selected dimensions, and saving the result set to HBase. The default computation engine is MapReduce, but Spark can be chosen as the computation engine. The result of one build is referred to as a Segment. The building process involves the creation of multiple cuboids, and the specific creation process is determined by the kylin.cube.algorithm parameter. The parameter value can be auto, layer, or inmem, with the default value being auto, meaning Kylin will dynamically select an algorithm (layer or inmem) based on the collected data. If the user is very familiar with Kylin and their own data and cluster, they can directly set their preferred algorithm.

  • Execute the query, read the cuboid, run it, and produce query results.

Layer (Layered Construction) Algorithm#

image.png

An N-dimensional cube consists of 1 N-dimensional sub-cube, N (N-1)-dimensional sub-cubes, N*(N-1)/2 (N-2)-dimensional sub-cubes, ..., N 1-dimensional sub-cubes, and 1 0-dimensional sub-cube, totaling 2^N sub-cubes. In the layered algorithm, calculations are performed layer by layer, reducing by the number of dimensions. Each level's calculation (except for the first layer, which is aggregated from raw data) is based on the results of the previous level. For example, the result of [Group by A, B] can be derived from the result of [Group by A, B, C] by aggregating after removing C; this reduces redundant calculations. When the 0-dimensional cuboid is calculated, the entire cube's calculation is complete.

Note:
Each round of calculations is a MapReduce task and is executed serially; an N-dimensional cube requires at least N MapReduce jobs.

Advantages of the Algorithm:

  1. This algorithm fully utilizes the capabilities of MapReduce, handling complex intermediate sorting and shuffling, making the algorithm code clear, simple, and easy to maintain.
  2. Benefiting from the increasing maturity of Hadoop, this algorithm has low cluster requirements and stable operation; during the internal maintenance of Kylin, errors in these steps are rarely encountered; even when the Hadoop cluster is busy, tasks can still be completed.

Disadvantages of the Algorithm:

  1. When the cube has many dimensions, the required MapReduce tasks increase correspondingly; since Hadoop's task scheduling consumes additional resources, especially when the cluster is large, the overhead from repeatedly submitting tasks can be considerable.
  2. Since the Mapper does not perform pre-aggregation, this algorithm outputs a large amount of data from Hadoop MapReduce; although a Combiner is used to reduce data transfer from the Mapper to the Reducer, all data still needs to be sorted and combined through Hadoop MapReduce to be aggregated, which increases the pressure on the cluster.
  3. Frequent read and write operations on HDFS: Since the output of each layer's computation is used as input for the next layer, these Key-Value pairs need to be written to HDFS; after all calculations are complete, Kylin requires an additional round of tasks to convert these files into HBase's HFile format for import into HBase.

Overall, the efficiency of this algorithm is relatively low, especially when the number of cube dimensions is large.

Inmem (Fast Construction) Algorithm#

Also known as the "By Segment" or "By Split" algorithm, this algorithm was introduced starting from 1.5.x, utilizing the Mapper side to perform most of the aggregation first, then passing the aggregated results to the Reducer to reduce network bottlenecks. The main idea of this algorithm is:

  • For the data blocks assigned to the Mapper, compute them into a complete small cube segment (containing all cuboids);
  • Each Mapper outputs the computed cube segment to the Reducer for merging, generating the large cube, which is the final result.

The following diagram explains this process.
image.png

Compared to the Layer (Layered Construction) Algorithm, the Fast Algorithm has two main differences:

  • Mappers utilize memory for pre-aggregation, calculating all combinations; each Key output by the Mapper is different, reducing the amount of data output to Hadoop MapReduce, and the Combiner is no longer needed.
  • A single MapReduce job completes all levels of calculations, reducing the scheduling of Hadoop tasks.

Kylin Cube Construction Process#

The following diagram illustrates the job process of building a cube:
image.png

Analysis of Several Important Processes#

Generating Base Cuboid from Hive Table#

In the actual cube construction process, a large wide table is first generated based on the cube's Hive fact table and dimension tables, then the cardinality of the large wide table's columns is calculated, a dimension dictionary is established, the size of the cuboid is estimated, the corresponding HBase table for the cube is created, and finally, the base cuboid is calculated.

Calculating the base cuboid is a MapReduce job, with the input being the aforementioned Hive large wide table, and the output's key being various dimension combinations, while the value is the metric values from the Hive large wide table.

Core source code: org.apache.kylin.engine.mr.steps.BaseCuboidMapperBase:

// Code to generate key-value in the map phase
protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
    byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
    outputKey.set(rowKey, 0, rowKey.length);

    ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
    outputValue.set(valueBuf.array(), 0, valueBuf.position());
    context.write(outputKey, outputValue);
}

Layered Calculation of Cuboid from HBase Cuboid#

Core source code: org.apache.kylin.engine.mr.steps.CuboidReducer

@Override
    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        aggs.reset();

        for (Text value : values) {
            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
            }
            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
            aggs.aggregate(input, needAggrMeasures);
        }
        aggs.collectStates(result);

        ByteBuffer valueBuf = codec.encode(result);

        outputValue.set(valueBuf.array(), 0, valueBuf.position());
        context.write(key, outputValue);
    }

Converting Cuboid to HBase's HFile#

Reference: Hive Data Bulk Load into HBase

Deployment and Basic Usage of Kylin#

Refer to the official link: Apache Kylin

Supplement:
In practice, we adopted the Nginx + Kylin cluster solution (refer to “Kylin Cluster Deployment Mode”), but due to resource constraints, the Kylin cluster and Hadoop cluster were on the same server, often leading to resource shortages at certain times, causing Kylin processes to crash and even dragging down HBase. Therefore, it is recommended to deploy a separate Kylin cluster and establish a monitoring mechanism.

Optimization of Kylin#

The core of Kylin lies in the cube; designing a good cube is crucial for Kylin's performance. The main factors affecting cube expansion rate and construction time include:

  • A large number of dimensions in the cube without proper cuboid pruning optimization, leading to an excessive number of cuboids;

  • The presence of high cardinality dimensions in the cube, causing each cuboid containing such dimensions to occupy a large amount of space, which accumulates to increase the overall size of the cube;

  • The existence of metrics that occupy significant space, such as Count Distinct, necessitating the storage of a large register for each row in the cuboid, potentially leading to each row in the cuboid being several tens of KB, thus increasing the overall size of the cube.

Optimization Ideas#

Dimension Optimization#

Dimension optimization methods:

  • Aggregation groups
  • Derived dimensions
  • Forced dimensions
  • Hierarchical dimensions
  • Joint dimensions
  • Extended columns

For detailed reference: Apache Kylin Optimization Guide

Concurrency Granularity Optimization#

When the size of a cuboid in a segment exceeds a certain threshold, the system will shard the data of that cuboid into multiple partitions to achieve parallel reading of cuboid data, thereby optimizing the query speed of the cube.

The construction engine determines how many partitions are needed to store the segment in the storage engine based on the estimated size of the segment and the parameter "kylin.hbase.region.cut". If the storage engine is HBase, the number of partitions corresponds to the number of Regions in HBase. The default value of kylin.hbase.region.cut is 5.0 GB, meaning for a segment estimated to be 50GB, the construction engine will allocate 10 partitions. Users can also set kylin.hbase.region.count.min (default is 1) and kylin.hbase.region.count.max (default is 500) to determine the minimum or maximum number of partitions each segment can be divided into.

Since the concurrency granularity control varies for each cube, it is recommended to customize the parameters controlling concurrency granularity for each cube during construction.

Recommended Programming Dream | Kang Kaisen's Technical Blog, the blogger is a committer for Apache Kylin and several other open-source projects, and the blog contains many high-quality articles on OLAP offline big data.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.