Yige

Yige

Build

数仓系列-Kylin概览

数仓系列 - Kylin 概览#

Kylin 的特点#

Kylin 的主要特点包括支持 SQL 接口、支持超大规模数据集、亚秒级响应、可伸缩性、高吞吐率、BI 工具集成等。

  1. 标准SQL接口:Kylin 是以标准的 SQL 作为对外服务的接口。
  2. 支持超大数据集:Kylin 对于大数据的支撑能力可能是目前所有技术中最为领先的。早在 2015 年 eBay 的生产环境中就能支持百亿记录的秒级查询,之后在移动的应用场景中又有了千亿记录秒级查询的案例。
  3. 亚秒级响应:Kylin 拥有优异的查询相应速度,这点得益于预计算,很多复杂的计算,比如连接、聚合,在离线的预计算过程中就已经完成,这大大降低了查询时刻所需的计算量,提高了响应速度。
  4. 可伸缩性和高吞吐率:单节点 Kylin 可实现每秒 70 个查询,还可以搭建 Kylin 的集群。
  5. BI工具集成: Kylin 可以与现有的 BI 工具集成,具体包括如下内容。
    1. ODBC:与 Tableau、Excel、PowerBI 等工具集成
    2. JDBC:与 Saiku、BIRT 等 Java 工具集成
    3. RestAPI:与 JavaScript、Web 网页集成

Kylin 的基本架构#

image.png

REST Server
REST Server 是一套面向应用程序开发的入口点,旨在实现针对 Kylin 平台的应用开发工作。 此类应用程序可以提供查询、获取结果、触发 cube 构建任务、获取元数据以及获取用户权限等等。另外可以通过 Restful 接口实现 SQL 查询。

查询引擎(Query Engine)
当 cube 准备就绪后,查询引擎就能够获取并解析用户查询。它随后会与系统中的其它组件进行交互,从而向用户返回对应的结果。

Routing
负责将解析的 SQL 生成的执行计划转换成 cube 缓存的查询,cube 是通过预计算缓存在 HBase 中,这部分查询可以在秒级设置毫秒级完成

元数据管理工具(Metadata)
Kylin 是一款元数据驱动型应用程序。元数据管理工具是一大关键性组件,用于对保存在 Kylin 当中的所有元数据进行管理,其中包括最为重要的 cube 元数据。其它全部组件的正常运作都需以元数据管理工具为基础。 Kylin 的元数据存储在 HBase 中。

任务引擎(Cube Build Engine
这套引擎的设计目的在于处理所有离线任务,其中包括 shell 脚本、Java API 以及 Map Reduce 任务等等。任务引擎对 Kylin 当中的全部任务加以管理与协调,从而确保每一项任务都能得到切实执行并解决其间出现的故障。

Kylin 工作原理#

Apache Kylin 的工作原理本质上是MOLAP(Multidimension On-Line Analysis Processing)Cube,也就是多维立方体分析,详细概念参考: 数仓系列 - 基本概念整理

核心算法#

对数据模型做 Cube 预计算,并利用计算的结果加速查询:

  • 指定数据模型,定义维度和度量;

  • 预计算 Cube,计算所有 Cuboid 并保存为物化视图;
    预计算过程是 Kylin 从 Hive 中读取原始数据,按照我们选定的维度进行计算,并将结果集保存到 HBase 中,默认的计算引擎为 MapReduce,可以选择 Spark 作为计算引擎。一次 build 的结果,我们称为一个 Segment。构建过程中会涉及多个 Cuboid 的创建,具体创建过程由 kylin.cube.algorithm 参数决定,参数值可选 auto,layer 和 inmem, 默认值为 auto,即 Kylin 会通过采集数据动态地选择一个算法 (layer or inmem),如果用户很了解 Kylin 和自身的数据、集群,可以直接设置喜欢的算法。

  • 执行查询,读取 Cuboid,运行,产生查询结果

layer (逐层构建) 算法#

image.png

一个 N 维的 Cube,是由 1 个 N 维子立方体、N 个 (N-1) 维子立方体、N*(N-1)/2 个 (N-2) 维子立方体、......、N 个 1 维子立方体和 1 个 0 维子立方体构成,总共有 2^N 个子立方体组成,在逐层算法中,按维度数逐层减少来计算,每个层级的计算(除了第一层,它是从原始数据聚合而来),是基于它上一层级的结果来计算的。比如,[Group by A, B] 的结果,可以基于 [Group by A, B, C] 的结果,通过去掉 C 后聚合得来的;这样可以减少重复计算;当 0 维度 Cuboid 计算出来的时候,整个 Cube 的计算也就完成了。

注意:
每一轮的计算都是一个 MapReduce 任务,且串行执行;一个 N 维的 Cube,至少需要 N 次 MapReduce Job

算法优点:

  1. 此算法充分利用了 MapReduce 的能力,处理了中间复杂的排序和洗牌工作,故而算法代码清晰简单,易于维护;
  2. 受益于 Hadoop 的日趋成熟,此算法对集群要求低,运行稳定;在内部维护 Kylin 的过程中,很少遇到在这几步出错的情况;即便是在 Hadoop 集群比较繁忙的时候,任务也能完成。

算法缺点:

  1. 当 Cube 有比较多维度的时候,所需要的 MapReduce 任务也相应增加;由于 Hadoop 的任务调度需要耗费额外资源,特别是集群较庞大的时候,反复递交任务造成的额外开销会相当可观;
  2. 由于 Mapper 不做预聚合,此算法会对 Hadoop MapReduce 输出较多数据;虽然已经使用了 Combiner 来减少从 Mapper 端到 Reducer 端的数据传输,所有数据依然需要通过 Hadoop MapReduce 来排序和组合才能被聚合,无形之中增加了集群的压力;
  3. 对 HDFS 的读写操作较多:由于每一层计算的输出会用做下一层计算的输入,这些 Key-Value 需要写到 HDFS 上;当所有计算都完成后,Kylin 还需要额外的一轮任务将这些文件转成 HBase 的 HFile 格式,以导入到 HBase 中去;

总体而言,该算法的效率较低,尤其是当 Cube 维度数较大的时候。

inmem (快速构建) 算法#

也被称作 “逐段”(By Segment) 或 “逐块”(By Split) 算法,从1.5.x开始引入该算法,利用 Mapper 端计算先完成大部分聚合,再将聚合后的结果交给 Reducer,从而降低对网络瓶颈的压力。该算法的主要思想是:

  • 对 Mapper 所分配的数据块,将它计算成一个完整的小 Cube 段(包含所有 Cuboid);
  • 每个 Mapper 将计算完的 Cube 段输出给 Reducer 做合并,生成大 Cube,也就是最终结果;

如图所示解释了此流程。
image.png

与 layer (逐层构建) 算法相比,快速算法主要有两点不同:

  • Mapper 会利用内存做预聚合,算出所有组合;Mapper 输出的每个 Key 都是不同的,这样会减少输出到 Hadoop MapReduce 的数据量,Combiner 也不再需要;
  • 一轮 MapReduce 便会完成所有层次的计算,减少 Hadoop 任务的调配

Kylin Cube 的构建过程#

如图,是一个构建 Cube 的 Job 过程:
image.png

几个重要流程分析#

从 Hive 表生成 Base Cuboid#

在实际的 cube 构建过程中,会首先根据 cube 的 Hive 事实表和维表生成一张大宽表,然后计算大宽表列的基数,建立维度字典,估算 cuboid 的大小,建立 cube 对应的 HBase 表,再计算 base cuboid。

计算 base cuboid 就是一个 MapReduce 作业,其输入是上面提到的 Hive 大宽表,输出是的 key 是各种维度组合,value 是 Hive 大宽表中指标的值

核心源码: org.apache.kylin.engine.mr.steps.BaseCuboidMapperBase:

// map 阶段生成key-value的代码
protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
    byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
    outputKey.set(rowKey, 0, rowKey.length);

    ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
    outputValue.set(valueBuf.array(), 0, valueBuf.position());
    context.write(outputKey, outputValue);
}

从 HBase Cuboid 逐层计算 Cuboid#

核心源码: org.apache.kylin.engine.mr.steps.CuboidReducer

@Override
    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        aggs.reset();

        for (Text value : values) {
            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
            }
            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
            aggs.aggregate(input, needAggrMeasures);
        }
        aggs.collectStates(result);

        ByteBuffer valueBuf = codec.encode(result);

        outputValue.set(valueBuf.array(), 0, valueBuf.position());
        context.write(key, outputValue);
    }

Cuboid 转化为 HBase 的 HFile#

参考: Hive 数据 bulkload 导入 HBase

Kylin 的部署以及基本使用#

参考官方链接: Apache Kylin

补充:
在实践过程中,我们采用 Nginx + Kylin 集群方案 (参考《Kylin 的集群部署模式部署》),但由于资源紧缺, Kylin 集群和 Hadoop 集群在统一的服务器上,经常会出现某个时刻 i 资源紧张导致 Kylin 进程崩溃,甚至拖垮 HBase 的问题,因此建议部署单独的 Kylin 集群,同时构建好监控机制

Kylin 的优化#

Kylin 核心在于 Cube,如何设计一个好的 Cube 对 Kylin 的性能来说便是至关重要的,影响 Cube 膨胀率和构建时间的重要因素主要有以下几个方面:

  • Cube 中的维度数量较多,且没有进行很好的 Cuboid 剪枝优化,导致 Cuboid 数量极多;

  • Cube 中存在较高基数的维度,导致包含这类维度的每一个 Cuboid 占用的空间都很大,这些 Cuboid 累积造成整体 Cube 体积变大;

  • 存在比较占用空间的度量,例如 Count Distinct,因此需要在 Cuboid 的每一行中都为其保存一个较大的寄存器,最坏的情况将会导致 Cuboid 中每一行都有数十 KB,从而造成整个 Cube 的体积变大;

优化思路#

维度优化#

维度优化手段

  • 聚合组
  • 衍生维度
  • 强制维度
  • 层次维度
  • 联合维度
  • Extended Column

详细参考: Apache Kylin 优化指南

并发粒度优化#

当 Segment 中某一个 Cuboid 的大小超出一定的阈值时,系统会将该 Cuboid 的数据分片到多个分区中,以实现 Cuboid 数据读取的并行化,从而优化 Cube 的查询速度。

构建引擎根据 Segment 估计的大小,以及参数“kylin.hbase.region.cut”的设置决定 Segment 在存储引擎中总共需要几个分区来存储,如果存储引擎是 HBase,那么分区的数量就对应于 HBase 中的 Region 数量。kylin.hbase.region.cut的默认值是 5.0,单位是 GB,也就是说对于一个大小估计是 50GB 的 Segment,构建引擎会给它分配 10 个分区。用户还可以通过设置kylin.hbase.region.count.min(默认为 1)和kylin.hbase.region.count.max(默认为 500)两个配置来决定每个 Segment 最少或最多被划分成多少个分区。

由于每个 Cube 的并发粒度控制不尽相同,因此建议构建 cube 的时候为每个 Cube 量身定制控制并发粒度的参数。

参考链接#

推荐 编程小梦 | 康凯森的技术博客, 博主是 Apache Kylin 等多个开源项目的 commiter, 博客有很多关于 OLAP 离线大数据方向的优质文章

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。