描绘一下 StarRocks 查询执行的核心流程,重点突出其全面向量化和Pipeline并行执行的特点。
核心概念:
-
Batch(批次/向量): 数据处理的基本单位,包含多行(例如1024行),但按列组织(Column[])。 -
Operator(算子): 执行特定操作的节点(如 Scan, Filter, Aggregation, Join)。每个算子都设计为处理输入Batch并输出Batch。 -
Pipeline(流水线): 一组相互连接的Operator,数据像流水一样在其中流动。一个查询可能被拆分成多个Pipeline。 -
Driver(驱动器): 负责驱动一个Pipeline执行的实体(通常绑定到一个线程)。它从源头拉取数据,推动数据流经Pipeline中的算子。 -
SIMD: 在关键计算环节(如算术运算、比较)使用 SIMD 指令处理整个Batch中的一列数据。
伪代码概览 (高度简化,忽略错误处理、资源管理、复杂优化等细节):
# 1. 查询解析与优化 (由 Coordinator 节点执行)
def execute_query(sql_query):
# 解析 SQL,生成抽象语法树 (AST)
ast = parse_sql(sql_query)
# 基于成本的优化器 (CBO) 工作
# - 使用统计信息 (表大小、列基数、数据分布等)
# - 考虑各种执行计划 (Join 顺序、Join 类型、聚合策略、分区/分桶裁剪等)
# - 选择预估成本最低的计划
physical_plan = cost_based_optimizer.optimize(ast)
# 将物理执行计划拆分成一个或多个可以并行执行的 Pipeline
pipelines = fragmenter.split_into_pipelines(physical_plan)
# 2. 分布式任务调度 (由 Coordinator 调度到 Backend 节点)
for pipeline in pipelines:
# 确定需要在哪些 Backend 节点上执行此 Pipeline (考虑数据本地性、负载均衡)
target_backends = scheduler.assign_backends(pipeline)
# 将 Pipeline 任务 (包含执行计划片段和参数) 发送到选定的 Backend 节点
for backend in target_backends:
backend.submit_pipeline_task(pipeline)
# 3. Pipeline 在 Backend 节点上的执行 (每个 Pipeline 由一个或多个 Driver 执行)
class PipelineDriver(Thread):
def __init__(self, pipeline):
self.pipeline = pipeline # 包含此 Pipeline 的算子列表 (Operator[])
self.source_op = pipeline.source # Pipeline 的源头算子 (如 Scan)
self.sink_op = pipeline.sink # Pipeline 的末端算子 (可能输出到网络或内存)
def run(self):
# **核心:驱动数据流过 Pipeline**
while True:
# (a) 从源头算子获取一个 Batch 数据
# - 源头算子 (如 Scan) 负责从存储层读取列式数据,按 Batch 组织
# - 可能应用存储层过滤 (前缀索引、Bloom Filter、分区裁剪等)
input_batch = self.source_op.get_next()
# 如果没有更多数据了,退出循环
if input_batch is None:
break
# (b) **关键:向量化处理** - 将 Batch 推送给 Pipeline 中的下一个算子
# 数据流: source -> op1 -> op2 -> ... -> sink
current_batch = input_batch
for operator in self.pipeline.operators[1:]: # 跳过源头算子
# **每个算子对输入的 Batch 进行向量化处理**
current_batch = operator.process_batch(current_batch)
# (c) 将最终处理后的 Batch 交给 Sink 算子
# - Sink 可能将结果发送给 Coordinator 或其他 Backend
# - 或写入内存用于后续处理 (如排序、Merge)
self.sink_op.consume_batch(current_batch)
# 4. 单个算子 (Operator) 的向量化处理示例 (以 Filter 算子为例)
class FilterOperator(Operator):
def __init__(self, predicate): # predicate 是过滤条件表达式
self.predicate = predicate
def process_batch(self, input_batch):
# **关键:向量化评估过滤条件**
# 对输入 Batch 的每一行应用 predicate 条件,生成一个布尔值的 "选择向量" (Selection Vector)
# 这里会大量使用 SIMD 指令加速比较和逻辑运算!
selection_vector = evaluate_predicate_vectorized(input_batch, self.predicate)
# **关键:向量化过滤**
# 根据 selection_vector,从 input_batch 中筛选出符合条件的行,生成新的 Batch
# 这个操作也是按列批量进行的,效率很高
output_batch = apply_selection_vector(input_batch, selection_vector)
return output_batch
# 另一个算子示例:聚合 (Hash Aggregation)
class HashAggregationOperator(Operator):
def __init__(self, group_by_exprs, agg_funcs):
self.group_by_exprs = group_by_exprs
self.agg_funcs = agg_funcs
self.hash_table = {} # 简化表示,实际是高效列式结构
def process_batch(self, input_batch):
# **关键:向量化计算 Group By 键**
group_keys = compute_group_keys_vectorized(input_batch, self.group_by_exprs)
# **关键:向量化聚合更新**
# 遍历 Batch 中的每一行 (但内部是批量操作列)
for i in range(input_batch.row_count()):
key = group_keys[i]
if key not in self.hash_table:
# 初始化新组的聚合状态 (sum=0, count=0, etc.)
self.hash_table[key] = initialize_agg_states(self.agg_funcs)
# **关键:使用向量化/SIMD 友好的方式更新聚合状态**
# 例如,更新 SUM: state.sum += input_batch['sales'][i]
# 更新 COUNT: state.count += 1
# 注意:虽然循环在行上,但内部状态更新函数可以设计为高效处理列数据块
update_agg_states(self.hash_table[key], input_batch, i, self.agg_funcs)
# 注意:聚合算子通常在处理完所有输入 Batch 后,才输出最终结果 (在 close 方法中)
# 这里简化,process_batch 不返回 Batch。最终结果由 sink 或其他机制获取。
return None # 或返回一个表示中间状态的标记
def close(self):
# 遍历 hash_table,将每个组的聚合状态计算最终结果 (如 sum, avg),形成输出 Batch
output_batch = convert_hash_table_to_batch(self.hash_table, self.agg_funcs)
return output_batch # 最终结果 Batch
# 5. 存储层 Scan 算子 (简化)
class ScanOperator(Operator):
def __init__(self, table, columns, predicates):
self.table = table
self.columns = columns
self.predicates = predicates # 下推的过滤条件
self.scanner = create_columnar_scanner(table, columns, predicates)
def get_next(self):
# **关键:从存储层读取下一个列式 Batch**
# - 利用列存格式,只读取需要的列 (`self.columns`)
# - 应用存储层谓词下推过滤 (利用前缀索引、ZoneMap、Bitmap索引等快速跳过数据块)
# - 将读取到的数据组织成 Batch 格式 (列数组)
return self.scanner.read_next_batch()
关键点解释 (为什么快):
- Batch Everywhere (全面批处理): 整个执行流程中,数据始终以
Batch为单位流动。每个Operator的输入输出都是Batch。这是向量化的基础。 - Vectorized Operators (向量化算子): 每个算子 (
process_batch方法) 的内部实现都针对Batch设计:- 列式操作: 直接操作列数组 (
Column[]),避免处理单行开销。 - 紧循环 (Tight Loop): 在列数据上执行紧凑的循环,最大化 CPU 缓存利用率。
- SIMD 加速: 在算术运算、比较、哈希计算等关键步骤,显式使用 SIMD 指令 (
evaluate_predicate_vectorized,update_agg_states内部)。
- 列式操作: 直接操作列数组 (
- Pipeline 执行模型:
- 消除阻塞:
Driver不断从源头拉取Batch并立即推送给下游算子。算子处理完一个Batch就立刻输出给下一个算子,不需要等待整个输入完成。大大减少了线程阻塞和等待时间。 - 流水线并行: 一个
Pipeline内的不同算子可以同时处理不同的Batch(类似 CPU 流水线)。多个Pipeline可以在不同线程/CPU 核心上并行执行。
- 消除阻塞:
- 存储层优化:
- 列式扫描 (
ScanOperator): 只读查询需要的列,利用列存格式的压缩和编码优势。 - 谓词下推: 将过滤条件 (
predicates) 尽可能下推到存储层,在扫描时利用索引 (前缀索引、Bloom Filter、位图索引、ZoneMap) 提前过滤掉大量无关数据块,减少 I/O 和 CPU 处理量。 - 按 Batch 读取: 存储层直接按
Batch粒度返回数据,与执行引擎无缝衔接。
- 列式扫描 (
- 高效的资源利用:
- 减少函数调用开销 (处理一行 vs 处理一批)。
- 提高 CPU 指令缓存 (Instruction Cache) 和数据缓存 (Data Cache) 命中率。
- 充分利用现代 CPU 特性 (SIMD, 多核并行)。
总结:
StarRocks 查询执行的伪代码核心展示了其如何通过 “全面向量化” (所有算子处理 Batch) 和 “Pipeline 并行执行” (数据流驱动,最小化等待) 两大核心技术,结合智能优化器和高效存储层,实现了极致的查询性能。它将数据处理任务分解成高效的、针对现代硬件优化的批处理操作流,最大限度地榨干了 CPU 和 I/O 的性能。
《starrocks查询伪代码》 是转载文章,点击查看原文。

