版本

v1.4-andium

引言

我们在 DuckDB 查询执行器架构解析 中梳理了执行器如何把物理计划切成 Pipeline、并在多线程环境中调度。那篇文章更多是站在查询 SQL 的的视角看 push-based 的运作方式,而真正让执行器高效的核心是每个算子都围绕 DataChunk 做向量化处理, 本篇就从执行器最基础的 Source 算子 TableScan 切入, 理解 DuckDB 的向量化执行。

TableScan

在物理计划中,TableScan 是最典型的 Source 算子: 它主要来进行扫表的操作, 来进行数据的读取操作, 没有输入但会批量产出 DataChunk 供整条 Pipeline 消费.

MetaPipeline 中,TableScan 不会创建新的子 Pipeline,它只是被加入当前 Pipeline 的 Source 列表, 根据 DuckDB 的执行器框架的算子原子, TableScan 是可以被并行操作的.

DataChunk 与 Vector

DataChunk 的结构

在 DuckDB 的实现中,DataChunk 就是每一批读取数据单元的抽象描述(src/include/duckdb/common/types/data_chunk.hpp).

class DataChunk {
public:
	/* 批次里每一列都是一个 Vector. */
	vector<Vector> data;

	/* 当前批次行数/列数. */
	inline idx_t size() const { return count; }
	inline idx_t ColumnCount() const { return data.size(); }
	inline void SetCardinality(idx_t count_p) { this->count = count_p; }

	/* 初始化批次并预分配 STANDARD_VECTOR_SIZE=2048 块. */
	DUCKDB_API void Initialize(ClientContext &context,
	                           const vector<LogicalType> &types,
	                           idx_t capacity = STANDARD_VECTOR_SIZE);
	
	/* 让当前批次引用另一批次的部分列. */
	DUCKDB_API void ReferenceColumns(DataChunk &other,
	                                 const vector<column_t> &column_ids);
	
	/* 基于 SelectionVector 对批次做切片. */
	DUCKDB_API void Slice(const SelectionVector &sel_vector, idx_t count);
	
	/* 重置批次, 复用底层 VectorCache. */
	DUCKDB_API void Reset();

	/* ... */
private:
	/* 当前批次行数与预分配容量. */
	idx_t count;
	idx_t capacity;
};

Initialize 调用时,每个列向量背后都会绑定一个 VectorCacheBuffer: 向量列对应一块可以容纳 STANDARD_VECTOR_SIZE(默认 2048)行的连续内存; LIST/ARRAY/STRUCT 等嵌套列还会递归地为子列准备自己的缓存和辅助 buffer.

vector<Vector> data 里的每个元素本质上就是 “指向一个固定长度的内存缓冲区”, TableScan 每次扫描就是把这些缓冲区写满再向上游推送.

Vector 的结构

DuckDB 中有几种不同的向量类型,每种类型都有其特定的优化目的:

enum class VectorType : uint8_t {
	FLAT_VECTOR,       // Flat vectors represent a standard uncompressed vector
	FSST_VECTOR,       // Contains string data compressed with FSST
	CONSTANT_VECTOR,   // Constant vector represents a single constant
	DICTIONARY_VECTOR, // Dictionary vector represents a selection vector on top of another vector
	SEQUENCE_VECTOR    // Sequence vector represents a sequence with a start point and an increment
};
  • FLAT_VECTOR: 标准的向量类型,数据在内存中连续存储,没有压缩

  • FSST_VECTOR: 使用了 Fast Static Symbol Table 压缩算法的向量,这个向量专门用来存储字符串数据. FSST 压缩算法也是 DuckDB 内部成员发表的压缩.

  • CONSTANT_VECTOR: 常量向量, 如果某一列值都是相等的,会直接使用常量向量, 常量向量预定义了一个大小依然是 STANDARD_VECTOR_SIZE 的向量:

static const sel_t ZERO_VECTOR[STANDARD_VECTOR_SIZE];

所有的元素都引用到位置 0, 避免重复创建向量.

  • DICTIONARY_VECTOR: 字典向量, 通过 SelectionVector 引用另一个向量的数据, 实现零拷贝.

  • SEQUENCE_VECTOR: SEQUENCE_VECTOR 向量是用 3 个元数据来表示任意长度的等差数列, DuckDB 的 range 函数 需要使用这个向量:

void Vector::Sequence(int64_t start, int64_t increment, idx_t count) {
    this->vector_type = VectorType::SEQUENCE_VECTOR;
    this->buffer = make_buffer<VectorBuffer>(sizeof(int64_t) * 3);
    auto data = reinterpret_cast<int64_t *>(buffer->GetData());
    data[0] = start;            /* 起始值. */
    data[1] = increment;        /* 增量. */
    data[2] = int64_t(count);   /* 元素个数. */
    validity.Reset();
    auxiliary.reset();
}

DuckDB 会在运行时根据数据特征和操作场景动态选择最优的向量类型.

SelectionVector 的延迟物化

DuckDB 向量化执行的核心优化之一是利用 SelectionVector 实现延迟物化,避免不必要的内存拷贝.

在 DataChunk 上 “记录哪些行有效” 是 SelectionVector 的职责. 它本质上是一个 sel_t* 索引数组,每一位代表当前 DataChunk 的某一行.

struct SelectionVector {
	SelectionVector() : sel_vector(nullptr) {}
	explicit SelectionVector(idx_t count) { Initialize(count); }

	/* 初始化. */
	void Initialize(idx_t count = STANDARD_VECTOR_SIZE);
	void Initialize(sel_t *sel);

	/* 写/读某个位置的行号. */
	inline void set_index(idx_t idx, idx_t loc) { sel_vector[idx] = UnsafeNumericCast<sel_t>(loc); }
	inline idx_t get_index(idx_t idx) const { return sel_vector ? sel_vector[idx] : idx; }

private:
	sel_t *sel_vector;
};

如果每个算子在处理后都需要复制满足条件的数据行, 这将导致内存带宽的浪费, 频繁的内存拷贝也会消耗大量的 CPU 影响整体的性能.

DuckDB 的 SelectionVector 机制通过解决这个问题:

/* 假设有一个包含 2048 行的 DataChunk. */
DataChunk chunk;
chunk.Initialize(context, {LogicalType::INTEGER, LogicalType::VARCHAR}, 2048);

/* TableScan 填充数据后, 所有行都有效:row_ids = [0, 1, 2, ..., 2047]. */

/* 第一个 Filter: WHERE status = 'VALID', 不复制数据,只创建 SelectionVector 记录满足条件的行号. */
SelectionVector sel1(800);
sel1.set_index(0, 5);
sel1.set_index(1, 7);
sel1.set_index(2, 12);

/* ... */

/* 第二个 Filter: WHERE amount > 1000, 基于 sel1 进一步过滤,生成 sel2. */
SelectionVector sel2(800);
idx_t sel2_count = 0;

/* 遍历 sel1 中的所有有效行, 记录满足第二个 Where 条件的行. */
for (idx_t i = 0; i < 800; i++) {
    auto row_idx = sel1.get_index(i);
    if (amount_data[row_idx] > 1000) {
        sel2.set_index(sel2_count++, row_idx);
    }
}

/* 假设最终 200 行同时满足两个条件: sel2 = [7, 9, 12, ...] (200 个索引,都是原始 DataChunk 的行号). */

/* Aggregate 算子读取数据时, 通过 SelectionVector 获取实际行号只访问有效行. */

TableScan 的扫描准备

在进入状态管理之前,先看 DuckDB 内置表函数 TableScanFunction::GetFunction()src/function/table/table_scan.cpp)。它返回一个 TableFunction 对象,里面挂着一系列函数指针,驱动整个表扫描生命周期:

TableFunction TableScanFunction::GetFunction() {
	TableFunction scan_function("seq_scan", {}, TableScanFunc);
	scan_function.init_local = TableScanInitLocal;
	scan_function.init_global = TableScanInitGlobal;
	scan_function.statistics = TableScanStatistics;
	scan_function.dependency = TableScanDependency;
	scan_function.cardinality = TableScanCardinality;
	scan_function.projection_pushdown = true;
	scan_function.filter_pushdown = true;
	scan_function.filter_prune = true;
	scan_function.sampling_pushdown = true;
	scan_function.late_materialization = true;
	scan_function.serialize = TableScanSerialize;
	scan_function.deserialize = TableScanDeserialize;
	scan_function.pushdown_expression = TableScanPushdownExpression;
	scan_function.get_virtual_columns = TableScanGetVirtualColumns;
	scan_function.get_row_id_columns = TableScanGetRowIdColumns;
	scan_function.set_scan_order = SetScanOrder;
	return scan_function;
}

可以把它理解成一张“函数指针表”:TableScanFunc 负责真正读取数据,TableScanInitLocal/TableScanInitGlobal 生成 Local/GlobalState, 其他回调负责统计以及各种 pushdown , 裁剪特性. 这种写法和 Linux 内核中的操作表类似,执行器只要在合适的阶段调用相应回调, 就能驱动任意表函数.

理解 TableScan 的向量化,需要清楚在每个线程上维护的状态:

  • TableScanGlobalSourceState: PhysicalTableScan 在执行器中的全局状态,用来管理 table function 的 init_global 结果、动态过滤器、in/out 参数.

  • TableScanLocalState: table function 内部为每个线程创建的私有上下文,持有 TableScanState scan_state(记录当前 RowGroup、偏移、过滤信息)、DataChunk all_columns(可选,缓存包含过滤列在内的全列数据)以及扫描过程中用到的一些本地缓冲信息.

基于 LocalState,每个线程就可以在 GetDataInternal 里重复使用 scan_stateall_columns,批量的把 RowGroup 数据搬到 DataChunk 里,而不必为每次扫描重新分配/初始化临时结构.

TableScan 的向量化读取

状态准备完毕后,真正的批量扫描发生在 TableScanFunc (src/function/table/table_scan.cpp).

先明确一下调用链:

PipelineExecutor::FetchFromSource      // 执行器在 Pipline 中 拉取数据
    -> PhysicalTableScan::GetData(...) // 调用算子的 Source 接口
        -> PhysicalTableScan::GetDataInternal(...)
            -> TableFunctionInput data(...)
            -> function.function(...) // TableScanFunc,真正扫描数据
void TableScanFunc(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
	/* 拿到当前线程的 LocalState(包含 scan_state、all_columns 等临时结构) */
	auto &l_state = data_p.local_state->Cast<TableScanLocalState>();
	l_state.scan_state.options.force_fetch_row = ClientConfig::GetConfig(context).force_fetch_row;

	do {
		/* 查询被中断,随时退出扫描循环 */
		if (context.interrupted) {
			throw InterruptException();
		}
		if (bind_data.is_create_index) {
			/* CREATE INDEX 特殊路径:只扫描已提交且未永久删除的行 */
			storage.CreateIndexScan(l_state.scan_state, output,
			                        TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED);
		} else if (CanRemoveFilterColumns()) {
			/* 启用 late materialization:先扫描到 all_columns,再引用需要的列 */
			l_state.all_columns.Reset();
			storage.Scan(tx, l_state.all_columns, l_state.scan_state);
			output.ReferenceColumns(l_state.all_columns, projection_ids);
		} else {
			/* 直接扫描到输出 DataChunk */
			storage.Scan(tx, output, l_state.scan_state);
		}
		if (output.size() > 0) {
			return; /* 本次批次已有数据,交给执行器 */
		}

		/* 当前 RowGroup 耗尽,向存储层申请下一个 morsel(可能来自 local storage) */
		auto next = storage.NextParallelScan(context, state, l_state.scan_state);
		if (!next) {
			return; /* 没有新的 RowGroup,扫描结束 */
		}
	} while (true);
}

TableScanFunc 内的 storage.Scan(...)/storage.CreateIndexScan(...) 最终都会落到存储层的 RowGroup::TemplatedScan. 这层主要做的是 “把当前 RowGroup 的一块数据搬进 DataChunk”, 和向量化框架本身关系不大:它会按 STANDARD_VECTOR_SIZE 决定本批次的行数,先检查采样/Zonemap/可见性生成 selection vector,必要时预取数据块;若无过滤则整列扫描,有过滤时先扫带过滤列生成 sel,再用 sel 抓取其余列;最后设置批次行数并推进到下一个 vector 块.

DuckDB 的向量操作

DuckDB 真正的向量化操作并没有手写 SIMD 指令, 而是依赖编译器自动向量化:

在 Release 编译模式下使用 -O3 默认包含了自动向量化 (比如 GCC 目前使用 -O3 就默认启动了-ftree-vectorize), 具体细节可以参考文章Auto-vectorization in GCC

大部分向量化操作通过struct VectorOperations提供统一接口, 底层使用UnaryExecutorBinaryExecutor 等模板类作为 Executor 来具体实现实际的向量化执行:

VectorOperations (公共接口)
    ↓ 调用
Executor 模板类 (具体实现)
    ↓ 调用
ExecuteLoop (循环, 借助编译器向量化)

比如判断相等的操作:

/* 公共接口. */
void VectorOperations::Equals(Vector &left, Vector &right, Vector &result, idx_t count) {
    /* 调用ComparisonExecutor 执行 duckdb::Equals 的 Operation 来做实际的相等比较. */
    ComparisonExecutor::Execute<duckdb::Equals>(left, right, result, count);
}

ComparisonExecutor::Execute 最后调用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环来依次判断是否相等:

template <class LEFT_TYPE, class RIGHT_TYPE, class RESULT_TYPE, class OPWRAPPER, class OP, class FUNC>
static void ExecuteGenericLoop(const LEFT_TYPE *__restrict ldata, const RIGHT_TYPE *__restrict rdata,
                               RESULT_TYPE *__restrict result_data, const SelectionVector *__restrict lsel,
                               const SelectionVector *__restrict rsel, idx_t count, ValidityMask &lvalidity,
                               ValidityMask &rvalidity, ValidityMask &result_validity, FUNC fun) {
	if (!lvalidity.AllValid() || !rvalidity.AllValid()) {
		for (idx_t i = 0; i < count; i++) {
			auto lindex = lsel->get_index(i);
			auto rindex = rsel->get_index(i);
			if (lvalidity.RowIsValid(lindex) && rvalidity.RowIsValid(rindex)) {
				auto lentry = ldata[lindex];
				auto rentry = rdata[rindex];
				result_data[i] = OPWRAPPER::template Operation<FUNC, OP, LEFT_TYPE, RIGHT_TYPE, RESULT_TYPE>(
				    fun, lentry, rentry, result_validity, i);
			} else {
				result_validity.SetInvalid(i);
			}
		}
	} else {
		for (idx_t i = 0; i < count; i++) {
			auto lentry = ldata[lsel->get_index(i)];
			auto rentry = rdata[rsel->get_index(i)];
			result_data[i] = OPWRAPPER::template Operation<FUNC, OP, LEFT_TYPE, RIGHT_TYPE, RESULT_TYPE>(
			    fun, lentry, rentry, result_validity, i);
		}
	}
}

这里的OPWRAPPER::template Operation就是 duckdb::Equals 的 Operation, 这里OPWRAPPER是一个模板参数决定的类型, Operation()是它的成员模板函数,这里需要显式用template关键字告诉编译器调用的是成员模板.

比如 Add 相加操作, 也是使用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环操作调用

struct AddOperator {
	template <class TA, class TB, class TR>
	static inline TR Operation(TA left, TB right) {
		return left + right;
	}
};

DuckDB 向量计算的模板技巧