版本
v1.4-andium
引言
我们在 DuckDB 查询执行器架构解析 中梳理了执行器如何把物理计划切成 Pipeline、并在多线程环境中调度。那篇文章更多是站在查询 SQL 的的视角看 push-based 的运作方式,而真正让执行器高效的核心是每个算子都围绕 DataChunk 做向量化处理, 本篇就从执行器最基础的 Source 算子 TableScan 切入, 理解 DuckDB 的向量化执行。
TableScan
在物理计划中,TableScan 是最典型的 Source 算子: 它主要来进行扫表的操作, 来进行数据的读取操作, 没有输入但会批量产出 DataChunk 供整条 Pipeline 消费.
在 MetaPipeline 中,TableScan 不会创建新的子 Pipeline,它只是被加入当前 Pipeline 的 Source 列表, 根据 DuckDB 的执行器框架的算子原子, TableScan 是可以被并行操作的.
DataChunk 与 Vector
DataChunk 的结构
在 DuckDB 的实现中,DataChunk 就是每一批读取数据单元的抽象描述(src/include/duckdb/common/types/data_chunk.hpp).
class DataChunk {
public :
/* 批次里每一列都是一个 Vector. */
vector < Vector > data ;
/* 当前批次行数/列数. */
inline idx_t size () const { return count ; }
inline idx_t ColumnCount () const { return data . size (); }
inline void SetCardinality ( idx_t count_p ) { this -> count = count_p ; }
/* 初始化批次并预分配 STANDARD_VECTOR_SIZE=2048 块. */
DUCKDB_API void Initialize ( ClientContext & context ,
const vector < LogicalType > & types ,
idx_t capacity = STANDARD_VECTOR_SIZE );
/* 让当前批次引用另一批次的部分列. */
DUCKDB_API void ReferenceColumns ( DataChunk & other ,
const vector < column_t > & column_ids );
/* 基于 SelectionVector 对批次做切片. */
DUCKDB_API void Slice ( const SelectionVector & sel_vector , idx_t count );
/* 重置批次, 复用底层 VectorCache. */
DUCKDB_API void Reset ();
/* ... */
private :
/* 当前批次行数与预分配容量. */
idx_t count ;
idx_t capacity ;
};
Initialize 调用时,每个列向量背后都会绑定一个 VectorCacheBuffer: 向量列对应一块可以容纳 STANDARD_VECTOR_SIZE(默认 2048)行的连续内存; LIST/ARRAY/STRUCT 等嵌套列还会递归地为子列准备自己的缓存和辅助 buffer.
vector<Vector> data 里的每个元素本质上就是 “指向一个固定长度的内存缓冲区”, TableScan 每次扫描就是把这些缓冲区写满再向上游推送.
Vector 的结构
DuckDB 中有几种不同的向量类型,每种类型都有其特定的优化目的:
enum class VectorType : uint8_t {
FLAT_VECTOR , // Flat vectors represent a standard uncompressed vector
FSST_VECTOR , // Contains string data compressed with FSST
CONSTANT_VECTOR , // Constant vector represents a single constant
DICTIONARY_VECTOR , // Dictionary vector represents a selection vector on top of another vector
SEQUENCE_VECTOR // Sequence vector represents a sequence with a start point and an increment
};
FLAT_VECTOR: 标准的向量类型,数据在内存中连续存储,没有压缩
FSST_VECTOR: 使用了 Fast Static Symbol Table 压缩算法的向量,这个向量专门用来存储字符串数据. FSST 压缩算法也是 DuckDB 内部成员发表的压缩.
CONSTANT_VECTOR: 常量向量, 如果某一列值都是相等的,会直接使用常量向量, 常量向量预定义了一个大小依然是 STANDARD_VECTOR_SIZE 的向量:
static const sel_t ZERO_VECTOR [ STANDARD_VECTOR_SIZE ];
所有的元素都引用到位置 0, 避免重复创建向量.
DICTIONARY_VECTOR: 字典向量, 通过 SelectionVector 引用另一个向量的数据, 实现零拷贝.
SEQUENCE_VECTOR: SEQUENCE_VECTOR 向量是用 3 个元数据来表示任意长度的等差数列, DuckDB 的 需要使用这个向量:
void Vector :: Sequence ( int64_t start , int64_t increment , idx_t count ) {
this -> vector_type = VectorType :: SEQUENCE_VECTOR ;
this -> buffer = make_buffer < VectorBuffer > ( sizeof ( int64_t ) * 3 );
auto data = reinterpret_cast < int64_t *> ( buffer -> GetData ());
data [ 0 ] = start ; /* 起始值. */
data [ 1 ] = increment ; /* 增量. */
data [ 2 ] = int64_t ( count ); /* 元素个数. */
validity . Reset ();
auxiliary . reset ();
}
DuckDB 会在运行时根据数据特征和操作场景动态选择最优的向量类型.
SelectionVector 的延迟物化
DuckDB 向量化执行的核心优化之一是利用 SelectionVector 实现延迟物化,避免不必要的内存拷贝.
在 DataChunk 上 “记录哪些行有效” 是 SelectionVector 的职责. 它本质上是一个 sel_t* 索引数组,每一位代表当前 DataChunk 的某一行.
struct SelectionVector {
SelectionVector () : sel_vector ( nullptr ) {}
explicit SelectionVector ( idx_t count ) { Initialize ( count ); }
/* 初始化. */
void Initialize ( idx_t count = STANDARD_VECTOR_SIZE );
void Initialize ( sel_t * sel );
/* 写/读某个位置的行号. */
inline void set_index ( idx_t idx , idx_t loc ) { sel_vector [ idx ] = UnsafeNumericCast < sel_t > ( loc ); }
inline idx_t get_index ( idx_t idx ) const { return sel_vector ? sel_vector [ idx ] : idx ; }
private :
sel_t * sel_vector ;
};
如果每个算子在处理后都需要复制满足条件的数据行, 这将导致内存带宽的浪费, 频繁的内存拷贝也会消耗大量的 CPU 影响整体的性能.
DuckDB 的 SelectionVector 机制通过解决这个问题:
/* 假设有一个包含 2048 行的 DataChunk. */
DataChunk chunk ;
chunk . Initialize ( context , { LogicalType :: INTEGER , LogicalType :: VARCHAR }, 2048 );
/* TableScan 填充数据后, 所有行都有效:row_ids = [0, 1, 2, ..., 2047]. */
/* 第一个 Filter: WHERE status = 'VALID', 不复制数据,只创建 SelectionVector 记录满足条件的行号. */
SelectionVector sel1 ( 800 );
sel1 . set_index ( 0 , 5 );
sel1 . set_index ( 1 , 7 );
sel1 . set_index ( 2 , 12 );
/* ... */
/* 第二个 Filter: WHERE amount > 1000, 基于 sel1 进一步过滤,生成 sel2. */
SelectionVector sel2 ( 800 );
idx_t sel2_count = 0 ;
/* 遍历 sel1 中的所有有效行, 记录满足第二个 Where 条件的行. */
for ( idx_t i = 0 ; i < 800 ; i ++ ) {
auto row_idx = sel1 . get_index ( i );
if ( amount_data [ row_idx ] > 1000 ) {
sel2 . set_index ( sel2_count ++ , row_idx );
}
}
/* 假设最终 200 行同时满足两个条件: sel2 = [7, 9, 12, ...] (200 个索引,都是原始 DataChunk 的行号). */
/* Aggregate 算子读取数据时, 通过 SelectionVector 获取实际行号只访问有效行. */
TableScan 的扫描准备
在进入状态管理之前,先看 DuckDB 内置表函数 TableScanFunction::GetFunction()(src/function/table/table_scan.cpp)。它返回一个 TableFunction 对象,里面挂着一系列函数指针,驱动整个表扫描生命周期:
TableFunction TableScanFunction :: GetFunction () {
TableFunction scan_function ( "seq_scan" , {}, TableScanFunc );
scan_function . init_local = TableScanInitLocal ;
scan_function . init_global = TableScanInitGlobal ;
scan_function . statistics = TableScanStatistics ;
scan_function . dependency = TableScanDependency ;
scan_function . cardinality = TableScanCardinality ;
scan_function . projection_pushdown = true ;
scan_function . filter_pushdown = true ;
scan_function . filter_prune = true ;
scan_function . sampling_pushdown = true ;
scan_function . late_materialization = true ;
scan_function . serialize = TableScanSerialize ;
scan_function . deserialize = TableScanDeserialize ;
scan_function . pushdown_expression = TableScanPushdownExpression ;
scan_function . get_virtual_columns = TableScanGetVirtualColumns ;
scan_function . get_row_id_columns = TableScanGetRowIdColumns ;
scan_function . set_scan_order = SetScanOrder ;
return scan_function ;
}
可以把它理解成一张“函数指针表”:TableScanFunc 负责真正读取数据,TableScanInitLocal/TableScanInitGlobal 生成 Local/GlobalState, 其他回调负责统计以及各种 pushdown , 裁剪特性. 这种写法和 Linux 内核中的操作表类似,执行器只要在合适的阶段调用相应回调, 就能驱动任意表函数.
理解 TableScan 的向量化,需要清楚在每个线程上维护的状态:
TableScanGlobalSourceState: PhysicalTableScan 在执行器中的全局状态,用来管理 table function 的 init_global 结果、动态过滤器、in/out 参数.
TableScanLocalState: table function 内部为每个线程创建的私有上下文,持有 TableScanState scan_state(记录当前 RowGroup、偏移、过滤信息)、DataChunk all_columns(可选,缓存包含过滤列在内的全列数据)以及扫描过程中用到的一些本地缓冲信息.
基于 LocalState,每个线程就可以在 GetDataInternal 里重复使用 scan_state 和 all_columns,批量的把 RowGroup 数据搬到 DataChunk 里,而不必为每次扫描重新分配/初始化临时结构.
TableScan 的向量化读取
状态准备完毕后,真正的批量扫描发生在 TableScanFunc (src/function/table/table_scan.cpp).
先明确一下调用链:
PipelineExecutor::FetchFromSource // 执行器在 Pipline 中 拉取数据
-> PhysicalTableScan::GetData(...) // 调用算子的 Source 接口
-> PhysicalTableScan::GetDataInternal(...)
-> TableFunctionInput data(...)
-> function.function(...) // TableScanFunc,真正扫描数据
void TableScanFunc ( ClientContext & context , TableFunctionInput & data_p , DataChunk & output ) {
/* 拿到当前线程的 LocalState(包含 scan_state、all_columns 等临时结构) */
auto & l_state = data_p . local_state -> Cast < TableScanLocalState > ();
l_state . scan_state . options . force_fetch_row = ClientConfig :: GetConfig ( context ). force_fetch_row ;
do {
/* 查询被中断,随时退出扫描循环 */
if ( context . interrupted ) {
throw InterruptException ();
}
if ( bind_data . is_create_index ) {
/* CREATE INDEX 特殊路径:只扫描已提交且未永久删除的行 */
storage . CreateIndexScan ( l_state . scan_state , output ,
TableScanType :: TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED );
} else if ( CanRemoveFilterColumns ()) {
/* 启用 late materialization:先扫描到 all_columns,再引用需要的列 */
l_state . all_columns . Reset ();
storage . Scan ( tx , l_state . all_columns , l_state . scan_state );
output . ReferenceColumns ( l_state . all_columns , projection_ids );
} else {
/* 直接扫描到输出 DataChunk */
storage . Scan ( tx , output , l_state . scan_state );
}
if ( output . size () > 0 ) {
return ; /* 本次批次已有数据,交给执行器 */
}
/* 当前 RowGroup 耗尽,向存储层申请下一个 morsel(可能来自 local storage) */
auto next = storage . NextParallelScan ( context , state , l_state . scan_state );
if ( ! next ) {
return ; /* 没有新的 RowGroup,扫描结束 */
}
} while ( true );
}
TableScanFunc 内的 storage.Scan(...)/storage.CreateIndexScan(...) 最终都会落到存储层的 RowGroup::TemplatedScan. 这层主要做的是 “把当前 RowGroup 的一块数据搬进 DataChunk”, 和向量化框架本身关系不大:它会按 STANDARD_VECTOR_SIZE 决定本批次的行数,先检查采样/Zonemap/可见性生成 selection vector,必要时预取数据块;若无过滤则整列扫描,有过滤时先扫带过滤列生成 sel,再用 sel 抓取其余列;最后设置批次行数并推进到下一个 vector 块.
DuckDB 的向量操作
DuckDB 真正的向量化操作并没有手写 SIMD 指令, 而是依赖编译器自动向量化:
在 Release 编译模式下使用 -O3 默认包含了自动向量化 (比如 GCC 目前使用 -O3 就默认启动了-ftree-vectorize), 具体细节可以参考文章
大部分向量化操作通过struct VectorOperations提供统一接口, 底层使用UnaryExecutor、BinaryExecutor 等模板类作为 Executor 来具体实现实际的向量化执行:
VectorOperations ( 公共接口)
↓ 调用
Executor 模板类 ( 具体实现)
↓ 调用
ExecuteLoop ( 循环, 借助编译器向量化)
比如判断相等的操作:
/* 公共接口. */
void VectorOperations :: Equals ( Vector & left , Vector & right , Vector & result , idx_t count ) {
/* 调用ComparisonExecutor 执行 duckdb::Equals 的 Operation 来做实际的相等比较. */
ComparisonExecutor :: Execute < duckdb :: Equals > ( left , right , result , count );
}
ComparisonExecutor::Execute 最后调用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环来依次判断是否相等:
template < class LEFT_TYPE , class RIGHT_TYPE , class RESULT_TYPE , class OPWRAPPER , class OP , class FUNC >
static void ExecuteGenericLoop ( const LEFT_TYPE * __restrict ldata , const RIGHT_TYPE * __restrict rdata ,
RESULT_TYPE * __restrict result_data , const SelectionVector * __restrict lsel ,
const SelectionVector * __restrict rsel , idx_t count , ValidityMask & lvalidity ,
ValidityMask & rvalidity , ValidityMask & result_validity , FUNC fun ) {
if ( ! lvalidity . AllValid () || ! rvalidity . AllValid ()) {
for ( idx_t i = 0 ; i < count ; i ++ ) {
auto lindex = lsel -> get_index ( i );
auto rindex = rsel -> get_index ( i );
if ( lvalidity . RowIsValid ( lindex ) && rvalidity . RowIsValid ( rindex )) {
auto lentry = ldata [ lindex ];
auto rentry = rdata [ rindex ];
result_data [ i ] = OPWRAPPER :: template Operation < FUNC , OP , LEFT_TYPE , RIGHT_TYPE , RESULT_TYPE > (
fun , lentry , rentry , result_validity , i );
} else {
result_validity . SetInvalid ( i );
}
}
} else {
for ( idx_t i = 0 ; i < count ; i ++ ) {
auto lentry = ldata [ lsel -> get_index ( i )];
auto rentry = rdata [ rsel -> get_index ( i )];
result_data [ i ] = OPWRAPPER :: template Operation < FUNC , OP , LEFT_TYPE , RIGHT_TYPE , RESULT_TYPE > (
fun , lentry , rentry , result_validity , i );
}
}
}
这里的OPWRAPPER::template Operation就是 duckdb::Equals 的 Operation, 这里OPWRAPPER是一个模板参数决定的类型, Operation()是它的成员模板函数,这里需要显式用template关键字告诉编译器调用的是成员模板.
比如 Add 相加操作, 也是使用 BinaryExecutor::ExecuteGenericLoop() 进行 for 循环操作调用
struct AddOperator {
template < class TA , class TB , class TR >
static inline TR Operation ( TA left , TB right ) {
return left + right ;
}
};
DuckDB 向量计算的模板技巧