挖坑: 来分析分析多维分析系统 Doris

目前可能并不那么好用,但无碍它是一个设计精妙的 OLAP 系统。

基本背景

名词 解释
FE Frontend 角色,提供 SQL 解析与调度
BE Backend 角色,提供计算与存储
Broker 通过 Broker 隐藏访问远程文件系统(主要是 HDFS)的具体实现细节
Tablet 一张表的分片,分片数量由分区与分桶数的乘积决定
Rowset 一次新的导入可理解为记录新版本的数据,Rowset 指代某范围版本的数据
Segment 物理数据段文件。一个 Rowset 的 Segments,其之间有区间重叠与否两种情况
Column 一份数据段由表的所有列组成,并且是平铺的结构(见后文 Segment V2 设计)
Page 64K 组成页,
Block  
   
Base
Compaction
 
Cumulative
Compaction
 

Rowset 相当于 LSM 树的磁盘子树,由 CollectIterator 对象负责归并读取并按需执行聚合函数。

有一个比较讨巧的设计,在现时的底层数据格式 Segment V2 下,数据段的写磁盘是不支持追加的,即内存必须全量记录数据(默认是 OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE=256M),然后刷写出一份段文件,才释放这部分内存。这会使得大数据量下,段文件非常多,即便是 Compaction 操作后也是如此实现,这也因为 Doris 的 Compaction 与标准 LSM 树(如 HBase)的 Compaction 含义不一样:

  • 已知从内存表中刷写到磁盘的段文件是比较小的,且可能区间重叠的,因此在标准的 LSM 树上是需要使用归并排序来读取数据,然后通过标准的 Compaction 合成一个大文件,以优化查询时的随机 IO。
  • 而 Doris 采用的方法是通过称为 Cumulative Compaction 合成出一系列数据段文件,但又因为,合成后的这些段不会存在区间重叠,所以就根本不需要通过归并排序去读,所以理想的查询时候也不会存在随机 IO。

好处就是通过排序键定位并往后扫描大量数据的时候,理论速度是非常快的。而小坏处有,

  • 当查询不是排序键,而是基于位图索引的键时,就需要随机 IO 把所有数据段文件的位图索引的数据加载到内存,一旦数据量大,且基数相对较高(百度云的文档显示,合适的基数固定值上限 10 万),位图索引的查询几乎不可用;
  • 在刚导入大量数据的时候,查询性能由于未完成 Compaction 而产生陡峭的性能下降,但这可以通过按照分桶规则排序预处理(理论可行,但目前版本不支持),或者减少每次导入的大小,增加批次。(而相对的, HBase 由于使用 Bulkload 导入对性能几乎无影响)

高基维上做索引,务必使用物化视图(可参考官方文档最佳实践 3),或者手动构建二级索引。

基础部件

LSM Tree

老朋友了,就不说了,主要魔改的地方是 Compaction,后面讨论。

Roaring BitMap

向量化

Population Count 填充计数运算 —— 求位串中 1 的出现次数

数组交集运算

数组并集运算

数组差集运算

Z-Order Curve

https://github.com/apache/incubator-doris/pull/7149

使用手册(踩过的坑)

导入

Broker Load

首先,尽量在建表时,把字段数据长度限制到准确范围。目前阶段 Broker Load 的内存管理并非依据实际数据长度进行分配,而是表 Schema 中反映的单行字节长度进行内存分配,并直接影响控制 MemTable 内存表刷盘时机判断(默认策略是内存表达 write_buffer_size=200MB 时刷盘)的准确性。

其次,需要优化调整相关参数,分别有两种,

第一种是控制内存表刷盘:

  1. 配置在 be.conf 的,刷写文件速率限制 push_write_mbytes_per_sec,-1 表示不限速
  2. 配置在 be.conf 的,当一个 Tablet 的内存表的大小达 write_buffer_size,开始异步刷盘
  3. 配置在导入 SQL 的,当所有内存表的大小达 exec_mem_limit 的 1/3 强制同步刷盘(TabletsChannel#reduce_mem_usage 方法)

第二种是转换并行度:

  1. 配置在 fe.conf 的,集群最大转换线程数 max_broker_concurrency
  2. 配置在导入 SQL 的,限制每个 BE 导入转换线程数的 load_parallelism
  3. 计算导入任务的分片数,但目前暂不支持非纯文本的切割,因此分片数约等于文件数

因此,Broker Load 转换并行度,也就是实例数,可以近似描述为,

$$ \rm min(BE \ 实例数 * loadParallelism, \ maxBrokerConcurrency, \ fileNum)$$

导入 SQL 可参考,

LOAD LABEL psd.lbs_wide_di (
    DATA INFILE ("hdfs://nameservice1/tmp/lbs_wide_di_20211226/*")
    INTO TABLE lbs_wide_di
    PARTITION (p_20211226)
    FORMAT AS "parquet"
)
WITH BROKER "hdfs_broker"
PROPERTIES (
    "exec_mem_limit" = "32212254720", -- 内存仅供参考
    "load_parallelism" = "32",        -- 控制单 BE 的 BrokerScanNode 数
    "timeout" = "259200"              -- 超时参数的上限,仅供参考
)

代码实现备注,

  1. 流程的起点可参见 PlanFragmentExecutor#open_internal,其实现的逻辑就是,不断地从作为数据源的 BrokerScanNode#get_next 中获取 RowBatch 对象,然后传递 RowBatch 到 OlapTableSink#send 方法,方法中会拆解 RowBatch,然后按照分区分桶规则重新组合,并镜像数据流到 Tablets 的所有副本(若存在多个物化视图,则会再次镜像数据流,发送到物化视图对应的 Tablets 的所有副本里),最终通过 PInternalService#tablet_writer_add_batch 的 RPC 调用把数据流汇合到所有的副本 BEs 中。
  2. 当其中一个副本 BE 接收到流入的数据后,通过 LoadChannel#add_batch 解析出要汇入的 Tablet,传递到对应 Tablet 的 DeltaWriter#write,把 RowBatch 转换为 Tuple,并插入到 Tablet 对应的 MemTable 内存表中,从此往后的过程,与 LSM 树结构的其他组件一样,跳表插入然后刷写磁盘。

Spark Load

Stream Load

查询

查询参数

parallel_fragment_exec_instance_num

storage_page_cache_limit
index_page_cache_percentage

doris_scanner_thread_pool_thread_num

max_compaction_threads

物化视图

物化视图可实现多种前缀索引,但可能需要注意以下几点:

1. 建表报错

ERROR 1064 (HY000): errCode = 2, detailMessage = The partition and distributed columns uid must be key column in mv

解决方法是在创建物化视图时,添加 order/group by index_col, 分桶键,比如:

CREATE MATERIALIZED VIEW lbs_wide_wi_for_wifi_idx_mat 
AS
SELECT 
    wifi,
    uid,      -- 分桶键
    app_key   -- 分桶键
FROM lbs_wide_wi
GROUP BY wifi, uid, app_key; -- 用 GROUP BY 去重

2. 用作二级索引查询

当前版本暂时不支持内部进行二级索引转换,即先从物化视图,查出前缀索引键,再从前缀索引键,查出所有列数据,所以临时解决方法可以通过 INNER JOIN 实现:

SELECT a.*
FROM lbs_wide_wi a
JOIN (
    SELECT DISTINCT uid, app_key, wifi -- 筛出前缀索引键,通过去重命中物化视图
    FROM lbs_wide_wi                   -- 右表足够小的话将会广播
) b ON a.uid = b.uid 
    AND a.app_key = b.app_key
    AND a.wifi = b.wifi
WHERE b.wifi = "AC:B3:B5:73:A1:28";    -- 条件将下推到右表

3. 暂时不能对已有数据的大表直接创建物化视图

当前版本对已有数据的大表创建物化视图,会导致 OOM,具体原因尚未查找。

存储层设计

Segment V2 设计

  • Ordinal Index

  • Short Key Index

  • ZoneMap Index

  • Bloom Filter

  • Bitmap Index

Compaction 设计

Base Compaction,

Cumulative Compaction,

数据副本管理设计

计算层设计

查询计划

Analyze SQL

构建 Plan Fragment

向量化工程

enable_vectorized_engine

性能分析与调试

DEBUG 日志

把 BE 或 FE 注释的配置设置为启用即可。

sys_log_verbose_modules = *
log_buffer_level = -1

SQL Profile

set global enable_profile = true;

CPU & Heap Profile

Compaction Metrics

Core Dump & PStack

TPC-DS 性能测试

Dialect netezza,需要修改日期加减与 || 语法(也可设置 Doirs SQL Mode)。去掉不支持的 OR 子查询。

全表扫描性能较差。(对比 Spark SQL)

参考资料

  1. https://space.bilibili.com/362350065
  2. https://my.oschina.net/u/4574386/blog/4332071
  3. https://my.oschina.net/u/4574386/blog/4425351
  4. https://my.oschina.net/u/4574386/blog/4531386
  5. https://mp.weixin.qq.com/s?__biz=Mzg5MDEyODc1OA==&mid=2247484083&idx=1&sn=268f0db0614ff0288f108518ffecfa52&chksm=cfe012aaf8979bbc478439bacc885e9ef9b6a49a69c637144c6ec7dc35501aed1e7d26932153&scene=21#wechat_redirect
  6. https://arxiv.org/pdf/1603.06549.pdf
  7. https://arxiv.org/pdf/1709.07821v2.pdf
  8. http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
  9. https://github.com/doris-vectorized/doris-vectorized
  10. https://en.wikipedia.org/wiki/Z-order_curve
  11. https://pythonawesome.com/numpy-implementation-of-hilbert-curves-in-arbitrary-dimensions/
  12. https://zhuanlan.zhihu.com/p/354334895
  13. https://github.com/WojciechMula/sse-popcount
  14. https://changkun.de/modern-cpp/zh-cn/00-preface/index.html

max_send_batch_parallelism_per_job

CONF_mInt64(write_buffer_size, “209715200”);

_tuple_buf = _table_mem_pool->allocate(_schema_size);

Create Table -> _create_tablet_worker_thread_callback -> StorageEngine#create_tablet -> TableManager#create_tablet

TypeEncodingTraits 类型编码与数据页的创建

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注