DuckDB 的向量化执行
- 版本
- 引言
- TableScan
- DataChunk 与 Vector
- TableScan 的扫描准备
- TableScan 的向量化读取
- DuckDB 的向量操作
- DuckDB 向量计算的模板技巧
- 总结
版本
v1.4-andium
引言
我们在 DuckDB 查询执行器架构解析 中梳理了执行器如何把物理计划切成 Pipeline、并在多线程环境中调度。那篇文章更多是站在查询 SQL 的的视角看 push-based 的运作方式,而真正让执行器高效的核心是每个算子都围绕 DataChunk 做向量化处理, 本篇就从执行器最基础的 Source 算子 TableScan 切入, 理解 DuckDB 的向量化执行。
TableScan
在物理计划中,TableScan 是最典型的 Source 算子: 它主要来进行扫表的操作, 来进行数据的读取操作, 没有输入但会批量产出 DataChunk 供整条 Pipeline 消费.
在 MetaPipeline 中,TableScan 不会创建新的子 Pipeline,它只是被加入当前 Pipeline 的 Source 列表, 根据 DuckDB 的执行器框架的算子原子, TableScan 是可以被并行操作的.
DataChunk 与 Vector
DataChunk 的结构
在 DuckDB 的实现中, DataChunk 就是每一批读取数据单元的抽象描述 (src/include/duckdb/common/types/data_chunk.hpp).
class DataChunk {
public:
/* 批次里每一列都是一个 Vector. */
vector<Vector> data;
/* 当前批次行数/列数. */
inline idx_t size() const { return count; }
inline idx_t ColumnCount() const { return data.size(); }
inline void SetCardinality(idx_t count_p) { this->count = count_p; }
/* 初始化批次并预分配 STANDARD_VECTOR_SIZE=2048 块. */
DUCKDB_API void Initialize(ClientContext &context,
const vector<LogicalType> &types,
idx_t capacity = STANDARD_VECTOR_SIZE);
/* 让当前批次引用另一批次的部分列. */
DUCKDB_API void ReferenceColumns(DataChunk &other,
const vector<column_t> &column_ids);
/* 基于 SelectionVector 对批次做切片. */
DUCKDB_API void Slice(const SelectionVector &sel_vector, idx_t count);
/* 重置批次, 复用底层 VectorCache. */
DUCKDB_API void Reset();
/* ... */
private:
/* 当前批次行数与预分配容量. */
idx_t count;
idx_t capacity;
};Initialize 调用时,每个列向量背后都会绑定一个 VectorCacheBuffer: 向量列对应一块可以容纳 STANDARD_VECTOR_SIZE(默认 2048)行的连续内存; LIST/ARRAY/STRUCT 等嵌套列还会递归地为子列准备自己的缓存和辅助 buffer.
vector<Vector> data 里的每个元素本质上就是 “指向一个固定长度的内存缓冲区”, TableScan 每次扫描就是把这些缓冲区写满再向上游推送.
Vector 的结构
DuckDB 中有几种不同的向量类型,每种类型都有其特定的优化目的:
enum class VectorType : uint8_t {
FLAT_VECTOR, // Flat vectors represent a standard uncompressed vector
FSST_VECTOR, // Contains string data compressed with FSST
CONSTANT_VECTOR, // Constant vector represents a single constant
DICTIONARY_VECTOR, // Dictionary vector represents a selection vector on top of another vector
SEQUENCE_VECTOR // Sequence vector represents a sequence with a start point and an increment
};-
FLAT_VECTOR: 标准的向量类型,数据在内存中连续存储,没有压缩
-
FSST_VECTOR: 使用了 Fast Static Symbol Table 压缩算法的向量,这个向量专门用来存储字符串数据. FSST 压缩算法也是 DuckDB 内部成员发表的压缩.
-
CONSTANT_VECTOR: 常量向量, 如果某一列值都是相等的,会直接使用常量向量, 常量向量预定义了一个大小依然是 STANDARD_VECTOR_SIZE 的向量:
static const sel_t ZERO_VECTOR[STANDARD_VECTOR_SIZE];所有的元素都引用到位置 0, 避免重复创建向量.
-
DICTIONARY_VECTOR: 字典向量, 通过 SelectionVector 引用另一个向量的数据, 实现零拷贝.
-
SEQUENCE_VECTOR: SEQUENCE_VECTOR 向量是用 3 个元数据来表示任意长度的等差数列, DuckDB 的 range 函数 需要使用这个向量:
void Vector::Sequence(int64_t start, int64_t increment, idx_t count) {
this->vector_type = VectorType::SEQUENCE_VECTOR;
this->buffer = make_buffer<VectorBuffer>(sizeof(int64_t) * 3);
auto data = reinterpret_cast<int64_t *>(buffer->GetData());
data[0] = start; /* 起始值. */
data[1] = increment; /* 增量. */
data[2] = int64_t(count); /* 元素个数. */
validity.Reset();
auxiliary.reset();
}DuckDB 会在运行时根据数据特征和操作场景动态选择最优的向量类型.
SelectionVector 的延迟物化
DuckDB 向量化执行的核心优化之一是利用 SelectionVector 实现延迟物化,避免不必要的内存拷贝.
在 DataChunk 上 “记录哪些行有效” 是 SelectionVector 的职责. 它本质上是一个 sel_t* 索引数组,每一位代表当前 DataChunk 的某一行.
struct SelectionVector {
SelectionVector() : sel_vector(nullptr) {}
explicit SelectionVector(idx_t count) { Initialize(count); }
/* 初始化. */
void Initialize(idx_t count = STANDARD_VECTOR_SIZE);
void Initialize(sel_t *sel);
/* 写/读某个位置的行号. */
inline void set_index(idx_t idx, idx_t loc) { sel_vector[idx] = UnsafeNumericCast<sel_t>(loc); }
inline idx_t get_index(idx_t idx) const { return sel_vector ? sel_vector[idx] : idx; }
private:
sel_t *sel_vector;
};如果每个算子在处理后都需要复制满足条件的数据行, 这将导致内存带宽的浪费, 频繁的内存拷贝也会消耗大量的 CPU 影响整体的性能.
DuckDB 的 SelectionVector 机制通过解决这个问题:
/* 假设有一个包含 2048 行的 DataChunk. */
DataChunk chunk;
chunk.Initialize(context, {LogicalType::INTEGER, LogicalType::VARCHAR}, 2048);
/* TableScan 填充数据后, 所有行都有效:row_ids = [0, 1, 2, ..., 2047]. */
/* 第一个 Filter: WHERE status = 'VALID', 不复制数据,只创建 SelectionVector 记录满足条件的行号. */
SelectionVector sel1(800);
sel1.set_index(0, 5);
sel1.set_index(1, 7);
sel1.set_index(2, 12);
/* ... */
/* 第二个 Filter: WHERE amount > 1000, 基于 sel1 进一步过滤,生成 sel2. */
SelectionVector sel2(800);
idx_t sel2_count = 0;
/* 遍历 sel1 中的所有有效行, 记录满足第二个 Where 条件的行. */
for (idx_t i = 0; i < 800; i++) {
auto row_idx = sel1.get_index(i);
if (amount_data[row_idx] > 1000) {
sel2.set_index(sel2_count++, row_idx);
}
}
/* 假设最终 200 行同时满足两个条件: sel2 = [7, 9, 12, ...] (200 个索引,都是原始 DataChunk 的行号). */
/* Aggregate 算子读取数据时, 通过 SelectionVector 获取实际行号只访问有效行. */TableScan 的扫描准备
在进入状态管理之前,先看 DuckDB 内置表函数 TableScanFunction::GetFunction()(src/function/table/table_scan.cpp)。它返回一个 TableFunction 对象,里面挂着一系列函数指针,驱动整个表扫描生命周期:
TableFunction TableScanFunction::GetFunction() {
TableFunction scan_function("seq_scan", {}, TableScanFunc);
scan_function.init_local = TableScanInitLocal;
scan_function.init_global = TableScanInitGlobal;
scan_function.statistics = TableScanStatistics;
scan_function.dependency = TableScanDependency;
scan_function.cardinality = TableScanCardinality;
scan_function.projection_pushdown = true;
scan_function.filter_pushdown = true;
scan_function.filter_prune = true;
scan_function.sampling_pushdown = true;
scan_function.late_materialization = true;
scan_function.serialize = TableScanSerialize;
scan_function.deserialize = TableScanDeserialize;
scan_function.pushdown_expression = TableScanPushdownExpression;
scan_function.get_virtual_columns = TableScanGetVirtualColumns;
scan_function.get_row_id_columns = TableScanGetRowIdColumns;
scan_function.set_scan_order = SetScanOrder;
return scan_function;
}可以把它理解成一张“函数指针表”:TableScanFunc 负责真正读取数据,TableScanInitLocal/TableScanInitGlobal 生成 Local/GlobalState, 其他回调负责统计以及各种 pushdown , 裁剪特性. 这种写法和 Linux 内核中的操作表类似,执行器只要在合适的阶段调用相应回调, 就能驱动任意表函数.
理解 TableScan 的向量化,需要清楚在每个线程上维护的状态:
-
TableScanGlobalSourceState:PhysicalTableScan在执行器中的全局状态,用来管理 table function 的init_global结果、动态过滤器、in/out 参数. -
TableScanLocalState: table function 内部为每个线程创建的私有上下文,持有TableScanState scan_state(记录当前 RowGroup、偏移、过滤信息)、DataChunk all_columns(可选,缓存包含过滤列在内的全列数据)以及扫描过程中用到的一些本地缓冲信息.
基于 LocalState,每个线程就可以在 GetDataInternal 里重复使用 scan_state 和 all_columns,批量的把 RowGroup 数据搬到 DataChunk 里,而不必为每次扫描重新分配/初始化临时结构.
TableScan 的向量化读取
状态准备完毕后,真正的批量扫描发生在 TableScanFunc (src/function/table/table_scan.cpp).
先明确一下调用链:
PipelineExecutor::FetchFromSource // 执行器在 Pipline 中 拉取数据
-> PhysicalTableScan::GetData(...) // 调用算子的 Source 接口
-> PhysicalTableScan::GetDataInternal(...)
-> TableFunctionInput data(...)
-> function.function(...) // TableScanFunc,真正扫描数据
void TableScanFunc(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
/* 拿到当前线程的 LocalState(包含 scan_state、all_columns 等临时结构) */
auto &l_state = data_p.local_state->Cast<TableScanLocalState>();
l_state.scan_state.options.force_fetch_row = ClientConfig::GetConfig(context).force_fetch_row;
do {
/* 查询被中断,随时退出扫描循环 */
if (context.interrupted) {
throw InterruptException();
}
if (bind_data.is_create_index) {
/* CREATE INDEX 特殊路径:只扫描已提交且未永久删除的行 */
storage.CreateIndexScan(l_state.scan_state, output,
TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED);
} else if (CanRemoveFilterColumns()) {
/* 启用 late materialization:先扫描到 all_columns,再引用需要的列 */
l_state.all_columns.Reset();
storage.Scan(tx, l_state.all_columns, l_state.scan_state);
output.ReferenceColumns(l_state.all_columns, projection_ids);
} else {
/* 直接扫描到输出 DataChunk */
storage.Scan(tx, output, l_state.scan_state);
}
if (output.size() > 0) {
return; /* 本次批次已有数据,交给执行器 */
}
/* 当前 RowGroup 耗尽,向存储层申请下一个 morsel(可能来自 local storage) */
auto next = storage.NextParallelScan(context, state, l_state.scan_state);
if (!next) {
return; /* 没有新的 RowGroup,扫描结束 */
}
} while (true);
}TableScanFunc 内的 storage.Scan(...)/storage.CreateIndexScan(...) 最终都会落到存储层的 RowGroup::TemplatedScan. 这层主要做的是 “把当前 RowGroup 的一块数据搬进 DataChunk”, 和向量化框架本身关系不大:它会按 STANDARD_VECTOR_SIZE 决定本批次的行数,先检查采样/Zonemap/可见性生成 selection vector,必要时预取数据块;若无过滤则整列扫描,有过滤时先扫带过滤列生成 sel,再用 sel 抓取其余列;最后设置批次行数并推进到下一个 vector 块.
DuckDB 的向量操作
DuckDB 真正的向量化操作并没有手写 SIMD 指令, 而是依赖编译器自动向量化:
在 Release 编译模式下使用 -O3 默认包含了自动向量化 (比如 GCC 目前使用 -O3 就默认启动了 -ftree-vectorize), 具体细节可以参考文章 Auto-vectorization in GCC
大部分向量化操作通过 struct VectorOperations 提供统一接口, 底层使用 UnaryExecutor、BinaryExecutor 等模板类作为 Executor 来具体实现实际的向量化执行:
VectorOperations (公共接口)
↓ 调用
Executor 模板类 (具体实现)
↓ 调用
ExecuteLoop (循环, 借助编译器向量化)比如判断相等的操作:
/* 公共接口. */
void VectorOperations::Equals(Vector &left, Vector &right, Vector &result, idx_t count) {
/* 调用ComparisonExecutor 执行 duckdb::Equals 的 Operation 来做实际的相等比较. */
ComparisonExecutor::Execute<duckdb::Equals>(left, right, result, count);
}ComparisonExecutor::Execute 最后调用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环来依次判断是否相等:
template <class LEFT_TYPE, class RIGHT_TYPE, class RESULT_TYPE, class OPWRAPPER, class OP, class FUNC>
static void ExecuteGenericLoop(const LEFT_TYPE *__restrict ldata, const RIGHT_TYPE *__restrict rdata,
RESULT_TYPE *__restrict result_data, const SelectionVector *__restrict lsel,
const SelectionVector *__restrict rsel, idx_t count, ValidityMask &lvalidity,
ValidityMask &rvalidity, ValidityMask &result_validity, FUNC fun) {
if (!lvalidity.AllValid() || !rvalidity.AllValid()) {
for (idx_t i = 0; i < count; i++) {
auto lindex = lsel->get_index(i);
auto rindex = rsel->get_index(i);
if (lvalidity.RowIsValid(lindex) && rvalidity.RowIsValid(rindex)) {
auto lentry = ldata[lindex];
auto rentry = rdata[rindex];
result_data[i] = OPWRAPPER::template Operation<FUNC, OP, LEFT_TYPE, RIGHT_TYPE, RESULT_TYPE>(
fun, lentry, rentry, result_validity, i);
} else {
result_validity.SetInvalid(i);
}
}
} else {
for (idx_t i = 0; i < count; i++) {
auto lentry = ldata[lsel->get_index(i)];
auto rentry = rdata[rsel->get_index(i)];
result_data[i] = OPWRAPPER::template Operation<FUNC, OP, LEFT_TYPE, RIGHT_TYPE, RESULT_TYPE>(
fun, lentry, rentry, result_validity, i);
}
}
}这里的 OPWRAPPER::template Operation 就是 duckdb::Equals 的 Operation, 这里 OPWRAPPER 是一个模板参数决定的类型, Operation() 是它的成员模板函数,这里需要显式用 template 关键字告诉编译器调用的是成员模板.
比如 Add 相加操作, 也是使用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环操作调用
struct AddOperator {
template <class TA, class TB, class TR>
static inline TR Operation(TA left, TB right) {
return left + right;
}
};DuckDB 向量计算的模板技巧
DuckDB 的向量计算的核心都围绕一个核心: 零成本抽象, 把运行时决策移到编译期, 借助编译器生成最优的代码.
- 编译期常量计算
static constexpr uint16_t COMPRESSED_SEGMENT_SIZE = 256;DuckDB 大量使用 constexpr 来定义变量为编译期常量, 避免了运行时的计算开销.
- 通过 Wrapper 模板统一不同函数签名
在 DuckDB 的一元算子框架里, 只有一个统一入口 Operation():
struct UnaryOperatorWrapper {
template <class OP, class INPUT_TYPE, class RESULT_TYPE>
static inline RESULT_TYPE Operation(INPUT_TYPE input, ValidityMask &mask, idx_t idx, void *dataptr) {
return OP::template Operation<INPUT_TYPE, RESULT_TYPE>(input);
}
};
struct UnaryLambdaWrapper {
template <class FUNC, class INPUT_TYPE, class RESULT_TYPE>
static inline RESULT_TYPE Operation(INPUT_TYPE input, ValidityMask &mask, idx_t idx, void *dataptr) {
auto fun = (FUNC *)dataptr;
return (*fun)(input);
}
};
struct GenericUnaryWrapper {
template <class OP, class INPUT_TYPE, class RESULT_TYPE>
static inline RESULT_TYPE Operation(INPUT_TYPE input, ValidityMask &mask, idx_t idx, void *dataptr) {
return OP::template Operation<INPUT_TYPE, RESULT_TYPE>(input, mask, idx, dataptr);
}
};
struct UnaryLambdaWrapperWithNulls {
template <class FUNC, class INPUT_TYPE, class RESULT_TYPE>
static inline RESULT_TYPE Operation(INPUT_TYPE input, ValidityMask &mask, idx_t idx, void *dataptr) {
auto fun = (FUNC *)dataptr;
return (*fun)(input, mask, idx);
}
};template <class INPUT_TYPE, class RESULT_TYPE, class OPWRAPPER, class OP>
static inline void ExecuteLoop(const INPUT_TYPE *__restrict ldata, RESULT_TYPE *__restrict result_data, idx_t count,
const SelectionVector *__restrict sel_vector, ValidityMask &mask,
ValidityMask &result_mask, void *dataptr, bool adds_nulls) {
#ifdef DEBUG
// ldata may point to a compressed dictionary buffer which can be smaller than ldata + count
idx_t max_index = 0;
for (idx_t i = 0; i < count; i++) {
auto idx = sel_vector->get_index(i);
max_index = MaxValue(max_index, idx);
}
ASSERT_RESTRICT(ldata, ldata + max_index, result_data, result_data + count);
#endif
if (!mask.AllValid()) {
for (idx_t i = 0; i < count; i++) {
auto idx = sel_vector->get_index(i);
if (mask.RowIsValidUnsafe(idx)) {
result_data[i] =
OPWRAPPER::template Operation<OP, INPUT_TYPE, RESULT_TYPE>(ldata[idx], result_mask, i, dataptr);
} else {
result_mask.SetInvalid(i);
}
}
} else {
for (idx_t i = 0; i < count; i++) {
auto idx = sel_vector->get_index(i);
result_data[i] =
OPWRAPPER::template Operation<OP, INPUT_TYPE, RESULT_TYPE>(ldata[idx], result_mask, i, dataptr);
}
}
}DuckDB 用 OPWRAPPER 把 4 种不同形态的函数 (标准一元运算符、lambda、需要自己处理 null / 需要外部 state 的 OP、会回写 null 的 lambda) 统一成一个函数签名, 根据 OPWRAPPER 可以直接调用不同的算子,完成不同的形式的计算.
- 编译期分支消除
template <bool HAS_RSEL, bool HAS_SEL_VECTOR, class T, bool INPUT_IS_ALREADY_HASH>
void TightLoopHash(const T *__restrict ldata, hash_t *__restrict result_data, const SelectionVector *rsel, idx_t count,
const SelectionVector *__restrict sel_vector, const ValidityMask &mask) {
if (!mask.AllValid()) {
for (idx_t i = 0; i < count; i++) {
/* HAS_RSEL, HAS_SEL_VECTOR 都是模板的参数, 所以可以在编译期间确定具体的值, 编译器会进行分支消除的优化. */
auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i;
auto idx = HAS_SEL_VECTOR ? sel_vector->get_index_unsafe(ridx) : ridx;
result_data[ridx] = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx])
: HashOp::Operation(ldata[idx], !mask.RowIsValidUnsafe(idx));
}
} else {
for (idx_t i = 0; i < count; i++) {
auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i;
auto idx = HAS_SEL_VECTOR ? sel_vector->get_index_unsafe(ridx) : ridx;
result_data[ridx] =
INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) : duckdb::Hash<T>(ldata[idx]);
}
}
}从 C++17 开始支持了 if constexpr, 作用也是编译器分支消除:
template <bool HAS_SEL>
void func() {
if constexpr (HAS_SEL) { // C++17 特性
// ...
}
}总结
DuckDB 的向量计算采用向量化执行, 核心思路是批量处理与编译期优化:
-
批量处理: 一次处理 2048 行数据, 利用编译器生成 SIMD 指令来减少循环开销.
-
零拷贝优化: 借助 SelectionVector 避免复制数据.
-
支持 5 种向量类型覆盖不同场景, 根据数据特征不同使用不同的向量类型.
-
编译期优化: 大量的模板元编程在编译期生成针对不同场景的代码, 消除运行时分支. 用编译期决策替代运行时决策, 通过模板在编译期为不同场景生成最优代码.