voidExecutor::InitializeInternal(PhysicalOperator&plan){/* 从当前 ClientContext 获取全局的任务调度器. */auto&scheduler=TaskScheduler::GetScheduler(context);{lock_guard<mutex>elock(executor_lock);physical_plan=&plan;this->profiler=ClientData::Get(context).profiler;profiler->Initialize(plan);this->producer=scheduler.CreateProducer();/* 构建所有的 pipelines. */PipelineBuildStatestate;/* 从 root 的 MetaPipeline 开始. */autoroot_pipeline=make_shared_ptr<MetaPipeline>(*this,state,nullptr);/* 调用 Build 从物理计划根节点递归构建所有相关的 Pipeline, 并按 Pipeline 构建原则建立 Pipeline Breaker 与依赖. */root_pipeline->Build(*physical_plan);/* 调用 Ready 为所有 pipelines 生成并准备事件序列 (initialize -> event -> prepare finish -> finish -> complete),
* 使它们可被调度执行. */root_pipeline->Ready();/* ... */// set root pipelines, i.e., all pipelines that end in the final sink
root_pipeline->GetPipelines(root_pipelines,false);root_pipeline_idx=0;// collect all meta-pipelines from the root pipeline
/* 收集所有待调度的 MetaPipeline (递归搜索). */vector<shared_ptr<MetaPipeline>>to_schedule;root_pipeline->GetMetaPipelines(to_schedule,true,true);// number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here
total_pipelines=to_schedule.size();// collect all pipelines from the root pipelines (recursively) for the progress bar and verify them
root_pipeline->GetPipelines(pipelines,true);// finally, verify and schedule
VerifyPipelines();ScheduleEvents(to_schedule);}}
//===--------------------------------------------------------------------===//
// Pipeline Construction
//===--------------------------------------------------------------------===//
voidPhysicalJoin::BuildJoinPipelines(Pipeline¤t,MetaPipeline&meta_pipeline,PhysicalOperator&op,boolbuild_rhs){op.op_state.reset();op.sink_state.reset();/* 把 Join 操作符加入当前 pipeline. */auto&state=meta_pipeline.GetState();state.AddPipelineOperator(current,op);vector<shared_ptr<Pipeline>>pipelines_so_far;meta_pipeline.GetPipelines(pipelines_so_far,false);auto&last_pipeline=*pipelines_so_far.back();vector<shared_ptr<Pipeline>>dependencies;optional_ptr<MetaPipeline>last_child_ptr;if(build_rhs){/* 构建右子树 (Build 侧) 的 MetaPipeline. */// on the RHS (build side), we construct a child MetaPipeline with this operator as its sink
auto&child_meta_pipeline=meta_pipeline.CreateChildMetaPipeline(current,op,MetaPipelineType::JOIN_BUILD);/* 根据 PhysicalOperator 递归构建右边的 pipelines. */child_meta_pipeline.Build(op.children[1]);if(op.children[1].get().CanSaturateThreads(current.GetClientContext())){// if the build side can saturate all available threads,
// we don't just make the LHS pipeline depend on the RHS, but recursively all LHS children too.
// this prevents breadth-first plan evaluation
/* 这个 if 判断是一个性能优化和执行顺序控制的关键逻辑.
* 如果 Build 阶段能占满所有可用线程, 要防止广度优先(breadth-first)的策略被先行执行,
* 也就是左子树所有的 pipelines 都要依赖右子树, 让 Build 阶段先完成再来调度左边的 Probe 阶段. */child_meta_pipeline.GetPipelines(dependencies,false);last_child_ptr=meta_pipeline.GetLastChild();}}// continue building the current pipeline on the LHS (probe side)
/* 继续当前 MetaPipeline, 构建左子树 (Probe 侧) 的 pipelines. */op.children[0].get().BuildPipelines(current,meta_pipeline);if(last_child_ptr){// the pointer was set, set up the dependencies
/* 让左子树中所有后续创建的子 MetaPipeline 的所有 Pipeline, 都依赖于右表 Build 侧的所有 Pipeline. */meta_pipeline.AddRecursiveDependencies(dependencies,*last_child_ptr);}/* 特殊 Join 类型处理. */switch(op.type){casePhysicalOperatorType::POSITIONAL_JOIN:// Positional joins are always outer
meta_pipeline.CreateChildPipeline(current,op,last_pipeline);return;casePhysicalOperatorType::CROSS_PRODUCT:return;default:break;}// Join can become a source operator if it's RIGHT/OUTER, or if the hash join goes out-of-core
/* 某些 Join 操作会主动产生额外的输出数据 (Source角色), 所以需要创建一个额外的 Pipeline 来处理这些输出. */if(op.Cast<PhysicalJoin>().IsSource()){meta_pipeline.CreateChildPipeline(current,op,last_pipeline);}}
MetaPipeline&MetaPipeline::CreateChildMetaPipeline(Pipeline¤t,PhysicalOperator&op,MetaPipelineTypetype){children.push_back(make_shared_ptr<MetaPipeline>(executor,state,&op,type));auto&child_meta_pipeline=*children.back().get();// store the parent
child_meta_pipeline.parent=¤t;// child MetaPipeline must finish completely before this MetaPipeline can start
/* 当前的 pipeline 依赖子 MetaPipeline 的 base pipeline 完成. */current.AddDependency(child_meta_pipeline.GetBasePipeline());// child meta pipeline is part of the recursive CTE too
child_meta_pipeline.recursive_cte=recursive_cte;returnchild_meta_pipeline;}
voidExecutor::InitializeInternal(PhysicalOperator&plan){/* 从当前 ClientContext 获取全局的任务调度器. */auto&scheduler=TaskScheduler::GetScheduler(context);{lock_guard<mutex>elock(executor_lock);physical_plan=&plan;this->profiler=ClientData::Get(context).profiler;profiler->Initialize(plan);this->producer=scheduler.CreateProducer();/* 构建所有的 pipelines. */PipelineBuildStatestate;/* 从 root 的 MetaPipeline 开始. */autoroot_pipeline=make_shared_ptr<MetaPipeline>(*this,state,nullptr);/* 调用 Build 从物理计划根节点递归构建所有相关的 Pipeline,并按 Pipeline 构建原则建立 Pipeline Breaker 与依赖关系. */root_pipeline->Build(*physical_plan);/* 整个 MetaPipeline 树上的所有 Pipeline 切换到 "可执行态", 并将每条 Pipeline 的算子链调整为执行所需的顺序. */root_pipeline->Ready();/* ... */// set root pipelines, i.e., all pipelines that end in the final sink
root_pipeline->GetPipelines(root_pipelines,false);root_pipeline_idx=0;// collect all meta-pipelines from the root pipeline
/* 收集所有待调度的 MetaPipeline (递归搜索). */vector<shared_ptr<MetaPipeline>>to_schedule;root_pipeline->GetMetaPipelines(to_schedule,true,true);// number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here
total_pipelines=to_schedule.size();// collect all pipelines from the root pipelines (recursively) for the progress bar and verify them
/* 收集所有待调度的 pipelines (递归搜索). */root_pipeline->GetPipelines(pipelines,true);// finally, verify and schedule
VerifyPipelines();/* 为所有 MetaPipeline 及其内的 Pipeline 构建 "事件调度栈" (Event Stack). */ScheduleEvents(to_schedule);}}
voidTaskScheduler::ExecuteForever(atomic<bool>*marker){#ifndef DUCKDB_NO_THREADS
staticconstexprconstint64_tINITIAL_FLUSH_WAIT=500000;// initial wait time of 0.5s (in mus) before flushing
auto&config=DBConfig::GetConfig(db);shared_ptr<Task>task;/* 线程主循环. */while(*marker){/* ... */if(queue->Dequeue(task)){/* 从无锁任务队列中获取任务. */autoprocess_mode=config.options.scheduler_process_partial?TaskExecutionMode::PROCESS_PARTIAL:TaskExecutionMode::PROCESS_ALL;/* 执行任务. */autoexecute_result=task->Execute(process_mode);/* 根据执行结果处理任务. */switch(execute_result){caseTaskExecutionResult::TASK_FINISHED:caseTaskExecutionResult::TASK_ERROR:task.reset();/* 任务完成, 释放资源. */break;caseTaskExecutionResult::TASK_NOT_FINISHED:{/* 任务未完成, 重新入队. */auto&token=*task->token;queue->Enqueue(token,std::move(task));break;}caseTaskExecutionResult::TASK_BLOCKED:/* 任务被阻塞, 取消调度. */task->Deschedule();task.reset();break;}}elseif(queue->GetTasksInQueue()>0){// failed to dequeue but there are still tasks remaining - signal again to retry
queue->semaphore.signal(1);}}/* ... */}
PipelineExecuteResultPipelineExecutor::Execute(idx_tmax_chunks){/* 确保当前 Pipeline 具有 Sink (数据接收端). */D_ASSERT(pipeline.sink);/* 获取源数据, 如果这个 Pipeline 没有中间操作符, 直接使用 final_chunk,
* 否则使用第一个中间数据块 intermediate_chunks[0]. */auto&source_chunk=pipeline.operators.empty()?final_chunk:*intermediate_chunks[0];/* ExecutionBudget 用来限制单次调用 Execute() 最多处理多少个 Chunk 的数据, 超过以后会结束这轮任务. */ExecutionBudgetchunk_budget(max_chunks);/* 主执行循环. */do{/* 检查客户端是否已中断. */if(context.client.interrupted){throwInterruptException();}/* 根据不同状态选择执行路径, 这些状态符 (exhausted_source, done_flushing..) 初始化都是 false. */OperatorResultTyperesult;if(exhausted_source&&done_flushing&&!remaining_sink_chunk&&!next_batch_blocked&&in_process_operators.empty()){/* 所有条件都满足, 退出循环. */break;}elseif(remaining_sink_chunk){// The pipeline was interrupted by the Sink. We should retry sinking the final chunk.
/* Sink 操作符之前是被阻塞的, 重新尝试将最终数据块推送到 Sink. */result=ExecutePushInternal(final_chunk,chunk_budget);D_ASSERT(result!=OperatorResultType::HAVE_MORE_OUTPUT);remaining_sink_chunk=false;}elseif(!in_process_operators.empty()&&!started_flushing){// Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call
// the operators have to be called with the same input chunk to produce the rest of the output
/* PhysicalOperator 在上次执行中返回了 HAVE_MORE_OUTPUT, 需要再次调用这个操作符产生剩下的输出结果. */D_ASSERT(source_chunk.size()>0);result=ExecutePushInternal(source_chunk,chunk_budget);}elseif(exhausted_source&&!next_batch_blocked&&!done_flushing){// The source was exhausted, try flushing all operators
/* 数据源已消费完, 尝试刷新支持缓存的操作符. */autoflush_completed=TryFlushCachingOperators(chunk_budget);if(flush_completed){done_flushing=true;break;}else{if(remaining_sink_chunk){returnPipelineExecuteResult::INTERRUPTED;}else{D_ASSERT(chunk_budget.IsDepleted());returnPipelineExecuteResult::NOT_FINISHED;}}}elseif(!exhausted_source||next_batch_blocked){/* 通过 Pipeline 的 Source PhysicalOperator 获取新数据并处理. */SourceResultTypesource_result;if(!next_batch_blocked){// "Regular" path: fetch a chunk from the source and push it through the pipeline
source_chunk.Reset();source_result=FetchFromSource(source_chunk);if(source_result==SourceResultType::BLOCKED){/* 如果数据源被阻塞, 返回中断状态. */returnPipelineExecuteResult::INTERRUPTED;}/* 如果数据源已经消费完成, 标记 FINISHED. */if(source_result==SourceResultType::FINISHED){exhausted_source=true;}}if(required_partition_info.AnyRequired()){autonext_batch_result=NextBatch(source_chunk);next_batch_blocked=next_batch_result==SinkNextBatchType::BLOCKED;if(next_batch_blocked){returnPipelineExecuteResult::INTERRUPTED;}}if(exhausted_source&&source_chunk.size()==0){// To ensure that we're not early-terminating the pipeline
continue;}/* 将数据块推送到 Pipeline 中的 PhysicalOperator 处理. */result=ExecutePushInternal(source_chunk,chunk_budget);}else{throwInternalException("Unexpected state reached in pipeline executor");}if(result==OperatorResultType::BLOCKED){/* 处理 Sink 操作中断, 如果 Sink 操作符被阻塞, 设置 remaining_sink_chunk = true, 返回中断状态. */remaining_sink_chunk=true;returnPipelineExecuteResult::INTERRUPTED;}if(result==OperatorResultType::FINISHED){/* 整个 Pipeline 已经处理完成, 直接返回. */break;}}while(chunk_budget.Next());/* 检查 Pipeline 是否完成:
* 1. Source 数据尚未被完全消费.
* 2. 缓存操作符的刷新操作没有执行.
* 3. 整个 Pipeline 还没有完成. */if((!exhausted_source||!done_flushing)&&!IsFinished()){returnPipelineExecuteResult::NOT_FINISHED;}/* 整个 Pipeline 已经完成, 执行最终化操作. */returnPushFinalize();}