说明
本文为UE5 TaskGraph系统的下篇,主要对TaskGraph系统中数据组织形式、任务创建与分发的过程等进行简要剖析
上篇中对TaskGraph中主要的基本类型进行了简要介绍,请移步UE5-TaskGraph系统-上:基本类型
任务队列
Task Graph的任务队列主要有两种:
- FNamedTaskThread拥有各自专属的任务队列Queues(FThreadTaskQueue类型数组),而RHIThread、GameThread、ActualRenderingThread都是FNamedTaskThread;
- FTaskThreadAnyThread没有各自专属的任务队列,共用IncomingAnyThreadTasks队列(FStallingTaskQueue类型数组)
上述Queues中包含两个(ENamedThreads::NumQueues = 2
)FThreadTaskQueue类型实例,分别代表MainQueue和LocalQueue,FThreadTaskQueue中拥有一个FStallingTaskQueue。
而IncomingAnyThreadTasks实际上是包含三个FStallingTaskQueue的数组(MAX_THREAD_PRIORITIES = 3
,三个分别对应三种优先级的FTaskThreadAnyThread)
而其中FStallingTaskQueue类型的实例都包含两个LockFreeList,分别存储Normal, High两种优先级的任务。
任务线程
创建
FTaskThreadAnyThread在FTaskGraphImplementation构造时就创建FRunnableThread
FTaskGraphImplementation::FTaskGraphImplementation(int32)
{
(...)
for (int32 ThreadIndex = LastExternalThread + 1; ThreadIndex < NumThreads; ThreadIndex++)
{
(...)
if (FForkProcessHelper::IsForkedMultithreadInstance() && GAllowTaskGraphForkMultithreading)
{
WorkerThreads[ThreadIndex].RunnableThread = FForkProcessHelper::CreateForkableThread(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity);
}
else
{
WorkerThreads[ThreadIndex].RunnableThread = FRunnableThread::Create(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity);
}
WorkerThreads[ThreadIndex].bAttached = true;
}
UE::Trace::ThreadGroupEnd();
}
FNamedTaskThread不会在FTaskGraphImplementation构造时就创建FRunnableThread,它们真正执行Task的Thread由对应的模块创建,并各自调用FTaskGraphInterface::Get().AttachToThread(ENamedThreads::GameThread)和对应的Worker相关联。
如Render模块会在调用StartRenderingThread函数时获得FRHIThread单例并调用其Start函数创建对应的FRunnableThread,而在使用FRHIThread::Run启动RHI线程时候,会调用FTaskGraphInterface::AttachToThread将其与WorkerThreads中的RHIThread关联。
// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp
class FRHIThread : public FRunnable
{
public:
FRunnableThread* Thread;
FRHIThread()
: Thread(nullptr)
{
check(IsInGameThread());
}
virtual bool Init(void) override
{
PRAGMA_DISABLE_DEPRECATION_WARNINGS
GRHIThreadId = FPlatformTLS::GetCurrentThreadId();
PRAGMA_ENABLE_DEPRECATION_WARNINGS
return true;
}
virtual uint32 Run() override
{
LLM_SCOPE(ELLMTag::RHIMisc);
#if CSV_PROFILER
FCsvProfiler::Get()->SetRHIThreadId(FPlatformTLS::GetCurrentThreadId());
#endif
FMemory::SetupTLSCachesOnCurrentThread();
{
FTaskTagScope Scope(ETaskTag::ERhiThread);
FPlatformProcess::SetupRHIThread();
FTaskGraphInterface::Get().AttachToThread(ENamedThreads::RHIThread);
FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(ENamedThreads::RHIThread);
}
FMemory::ClearAndDisableTLSCachesOnCurrentThread();
return 0;
}
static FRHIThread& Get()
{
static FRHIThread Singleton;
return Singleton;
}
void Start()
{
UE::Trace::ThreadGroupBegin(TEXT("Render"));
Thread = FRunnableThread::Create(this, TEXT("RHIThread"), 512 * 1024, FPlatformAffinity::GetRHIThreadPriority(),
FPlatformAffinity::GetRHIThreadMask(), FPlatformAffinity::GetRHIThreadFlags()
);
check(Thread);
UE::Trace::ThreadGroupEnd();
}
};
执行
FNamedTaskThread和FTaskThreadAnyThread在执行任务时,获取任务的方式有所不同。
FNamedTaskThread::ProcessTasksNamedThread先根据传入的QueueIndex从该线程专属的任务队列选择MainQueue或LocalQueue,然后根据优先级从对应的队列获得任务并执行;
而FTaskThreadAnyThread::ProcessTasks中需要调用FTaskGraphImplementation::FindWork,根据其线程索引计算出对应的优先级和任务索引,然后从IncomingAnyThreadTasks中获取任务。
WorkerThreads的组织形式
WorkerThreads先保存了RHIThread, GameThread, ActualRenderingThread,随后将FTaskThreadAnyThread均分(每组线程数为NumTaskThreadsPerSet)为三个优先级(皆低于Normal),TaskGraph系统用到的所有线程对象,其分布如下:
WorkerThreads
|<------------------------------------------- NumThreads ----------------------------------------------->|
| 0 | 1 | 2 |NumTaskThreadsPerSet|NumTaskThreadsPerSet|NumTaskThreadsPerSet|
|RHIThread|GameThread|ActualRenderingThread| NP | HP | BP |
| BelowNormal | SlightlyBelowNormal| TPri_Lowest |
NumTaskThreadSets = 3
任务的分发
UE中任务种类繁多,在构建TGraphTask时,通过模板类型推导得到typename TTask的具体任务类型。
为了方便说明,不妨以FDeferredShadingSceneRenderer::Render中,从Dynamic MeshBatch生成MeshDrawCommand的过程为例,来分析其创建MeshDrawCommand任务是如何创建并分发入队的。
Review:
如UE Mesh Drawing Pipeline所述,创建并入队生成MeshDrawCommand的任务的调用过程如下:
FVisibilityTaskData::FinishGatherDynamicMeshElements --> FVisibilityTaskData::SetupMeshPasses --> ComputeDynamicMeshRelevance
至此完成PassMask.Set。FVisibilityTaskData::SetupMeshPasses继续向下执行,会遍历每一个View,并调用FSceneRenderer::SetupMeshPass
在FSceneRenderer::SetupMeshPass中,遍历每种MeshPass,创建对应的FMeshPassProcessor,而后获取指定Pass的FParallelMeshDrawCommandPass对象,使用FParallelMeshDrawCommandPass::DispatchPassSetup并行地处理Pass,创建此Pass的FMeshDrawCommand。
而真正收集MeshDrawCommand的任务就是在FParallelMeshDrawCommandPass::DispatchPassSetup中完成创建并分发入队的。
// Engine\Source\Runtime\Renderer\Private\MeshDrawCommands.cpp
void FParallelMeshDrawCommandPass::DispatchPassSetup(
FScene* Scene,
const FViewInfo& View,
FInstanceCullingContext&& InstanceCullingContext,
EMeshPass::Type PassType,
FExclusiveDepthStencil::Type BasePassDepthStencilAccess,
FMeshPassProcessor* MeshPassProcessor,
const TArray<FMeshBatchAndRelevance, SceneRenderingAllocator>& DynamicMeshElements,
const TArray<FMeshPassMask, SceneRenderingAllocator>* DynamicMeshElementsPassRelevance,
int32 NumDynamicMeshElements,
TArray<const FStaticMeshBatch*, SceneRenderingAllocator>& InOutDynamicMeshCommandBuildRequests,
TArray<EMeshDrawCommandCullingPayloadFlags, SceneRenderingAllocator> InOutDynamicMeshCommandBuildFlags,
int32 NumDynamicMeshCommandBuildRequestElements,
FMeshCommandOneFrameArray& InOutMeshDrawCommands,
FMeshPassProcessor* MobileBasePassCSMMeshPassProcessor,
FMeshCommandOneFrameArray* InOutMobileBasePassCSMMeshDrawCommands
)
{
(...)
if (MaxNumDraws > 0)
{
(...)
if (bExecuteInParallel)
{
if (IsOnDemandShaderCreationEnabled())
{
TaskEventRef = TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext);
}
else
{
FGraphEventArray DependentGraphEvents;
DependentGraphEvents.Add(TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext));
TaskEventRef = TGraphTask<FMeshDrawCommandInitResourcesTask>::CreateTask(&DependentGraphEvents).ConstructAndDispatchWhenReady(TaskContext);
}
}
else
{
(...)
}
// This work needs to be deferred until at least BuildRenderingCommands (to ensure the DynamicPrimitiveCollector is uploaded), so we use the async mechanism either way
auto FinalizeInstanceCullingSetup = [this, Scene](FInstanceCullingContext& InstanceCullingContext)
{
WaitForMeshPassSetupTask();
#if DO_CHECK
for (const FVisibleMeshDrawCommand& VisibleMeshDrawCommand : TaskContext.MeshDrawCommands)
{
if (VisibleMeshDrawCommand.PrimitiveIdInfo.bIsDynamicPrimitive)
{
uint32 PrimitiveIndex = VisibleMeshDrawCommand.PrimitiveIdInfo.DrawPrimitiveId & ~GPrimIDDynamicFlag;
TaskContext.View->DynamicPrimitiveCollector.CheckPrimitiveProcessed(PrimitiveIndex, Scene->GPUScene);
}
}
#endif
InstanceCullingContext.SetDynamicPrimitiveInstanceOffsets(TaskContext.View->DynamicPrimitiveCollector.GetInstanceSceneDataOffset(), TaskContext.View->DynamicPrimitiveCollector.NumInstances());
};
TaskContext.InstanceCullingContext.BeginAsyncSetup(FinalizeInstanceCullingSetup);
}
}
其中的关键在:
TaskEventRef = TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask()
.ConstructAndDispatchWhenReady(TaskContext);
下面逐步分析:
任务创建
TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask()
template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
(...)
static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
FGraphEventRef GraphEvent = TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent();
int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
return FConstructor(new TGraphTask(MoveTemp(GraphEvent), NumPrereq), Prerequisites, CurrentThreadIfKnown);
}
(...)
}
首先,使用并行生成MeshDrawCommand的任务FMeshDrawCommandPassSetupTask,实例化TGraphTask模板,并调用TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask
,其中会根据任务的SubsequentsMode来决定是否为其创建GraphEvent:如果任务的SubsequentsMode是ESubsequentsMode::FireAndForget
,则没有后续任务依赖与当前任务,则无需处理后续任务,因此不需要GraphEvent;而如果是ESubsequentsMode::TrackSubsequents
,则需要GraphEvent在该任务执行完毕后处理后续任务。
最终,得到TGraphTask对应的工厂类FConstructor的实例。
TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext)
template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
class FConstructor
{
public:
/** Passthrough internal task constructor and dispatch. Note! Generally speaking references will not pass through; use pointers */
template<typename...T>
FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
{
new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
}
(...)
}
(...)
TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage;
(...)
}
然后调用FConstructor的实例的ConstructAndDispatchWhenReady函数,传入TaskContext,实例化其函数模板,使用placement new,调用FMeshDrawCommandPassSetupTask的构造函数完成任务实例的构造,并将其存入以1字节为元素大小的数组(以FMeshDrawCommandPassSetupTask的大小对齐)TaskStorage中,最后调用TGraphTask<FMeshDrawCommandPassSetupTask>::Setup
。
注:FConstructor::ConstructAndDispatchWhenReady传入右值引用(万能引用)并进行完美转发,不过此例中TaskContext后续还另有他用,因此传入的TaskContext是左值引用。
FGraphEventRef TGraphTask::Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));
FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
return ReturnedEventRef;
}
void TGraphTask::SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
{
checkThreadGraph(!TaskConstructed);
TaskConstructed = true;
TTask& Task = *(TTask*)&TaskStorage;
SetThreadToExecuteOn(Task.GetDesiredThread());
int32 AlreadyCompletedPrerequisites = 0;
if (Prerequisites)
{
for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
{
FGraphEvent* Prerequisite = (*Prerequisites)[Index];
if (Prerequisite == nullptr || !Prerequisite->AddSubsequent(this))
{
AlreadyCompletedPrerequisites++;
}
}
}
PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
}
在TGraphTask<FMeshDrawCommandPassSetupTask>::Setup
中嵌套调用TGraphTask<FMeshDrawCommandPassSetupTask>::SetupPrereqs
,在其中对TaskConstructed赋值true后,当前任务才算创建完成。而后的操作则是检查其先序事件以及入队操作。
注:TGraphTask并非在其构造函数中完成其真正的任务创建过程,其构造函数创建出的对象只是用来构造其内部工厂类,并设置代表其任务完成的FGraphEvent(如果该任务有后继的话)。而此时TaskStorage还未填入真正负责执行任务的TTask对象。实际上后续由TGraphTask内部工厂类的模板函数接受可变模板参数并完成实际的TTask对象的构造,并填入TaskStorage中。最后调用TGraphTask::Setup对TGraphTask进行设置,在其中嵌套调用TGraphTask::SetupPrereqs,当执行完 ‘TaskConstructed = true’ 才算真正完成了任务的创建过程。
任务入队
前面所述的TGraphTask::SetupPrereqs 继续执行,对先序任务事件进行遍历(如果有先序任务事件的话)。
对于每个事件,若该事件为空,或该事件所拥有的记录着依赖于该事件的后继任务的队列已经关闭(意味着后继任务已完成),则给已完成的前序事件数AlreadyCompletedPrerequisites +1,最后调用TGraphTask::PrerequisitesComplete
,并传入已完成的前序事件数。
TGraphTask::PrerequisitesComplete
中先计算要执行当前任务所需要等待完成的前序事件数
注:NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub
这样进行判断是因为FThreadSafeCounter::Subtract
底层调用的是_InterlockedExchangeAdd
,这是个原子加操作,进行操作后,返回的是被操作数的原值。
其中,NumAlreadyFinishedPrequistes + (bUnlock ? 1 : 0)
; 是因为在FBaseGraphTask构造时会给前序事件数+1,以防止任务执行,如下所示:
FBaseGraphTask::FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding)
: ThreadToExecuteOn(ENamedThreads::AnyThread)
, NumberOfPrerequistitesOutstanding(InNumberOfPrerequistitesOutstanding + 1) // + 1 is not a prerequisite, it is a lock to prevent it from executing while it is getting prerequisites, one it is safe to execute, call PrerequisitesComplete
{
checkThreadGraph(LifeStage.Increment() == int32(LS_Contructed));
CaptureInheritedContext();
}
前序任务已完成,立即入队
上述 ‘NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub’ 结果为true,则意味着前序任务已全部完成,可将当前创建的任务入队。
最后,FBaseGraphTask::QueueTask内调用FTaskGraphCompatibilityImplementation::QueueTask,根据传入的InThreadToExecuteOn决定将任务加入AnyThread或NamedThread中。
如果加入AnyThread,则需要根据InThreadToExecuteOn获得其线程优先级和任务优先级,并加入对应的队列;
如果加入NamedThread中,则需要根据InThreadToExecuteOn判断应该加入MainQueue还是LocalQueue,并进一步分为从本线程入队和从其他线程入队两种情况,并在其中获得任务优先级,加入相应的队列。
前序任务未完成,暂缓入队
对于某些有前序任务的任务,上述 ‘NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub’ 结果可能为false,意味着前序任务未全部完成,则当前创建的任务并不会被加入队列,那么该任务又是何时会再次尝试入队呢?
在前面TGraphTask::SetupPrereqs遍历统计先序任务事件时,会尝试将当前创建的任务加入先序任务事件的后继任务队列中。
而后,那些先序任务事件会在特定阶段调用FGraphEvent::DispatchSubsequents函数推动其先序任务执行,或当其先序任务已全部执行完毕时(即当前任务事件完成)尝试将其后继任务入队。
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp
void FGraphEvent::DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown)
{
TArray<FBaseGraphTask*> NewTasks;
DispatchSubsequents(NewTasks, CurrentThreadIfKnown);
}
void FGraphEvent::DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown, bool bInternal/* = false */)
{
(...)
bool bWakeUpWorker = false;
TArray<FBaseGraphTask*> PoppedTasks;
SubsequentList.PopAllAndClose(PoppedTasks);
for (FBaseGraphTask* NewTask : ReverseIterate(PoppedTasks)) // reverse the order since PopAll is implicitly backwards
{
checkThreadGraph(NewTask);
NewTask->ConditionalQueueTask(CurrentThreadIfKnown, bWakeUpWorker);
}
(...)
}
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
void FBaseGraphTask::ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker)
{
if (NumberOfPrerequistitesOutstanding.Decrement()==0)
{
QueueTask(CurrentThread, bWakeUpWorker);
bWakeUpWorker = true;
}
}
在FBaseGraphTask::ConditionalQueueTask会先将对应的任务的待执行前序任务事件计数NumberOfPrerequistitesOutstanding -1,并判断其是否为0。若为0,则可以将当前任务入队;若不为0,则意味着还有其他前序任务事件未完成,继续暂缓入队,当那些未完成的事件完成后,又会进行上述的流程尝试将该任务加入队列。
Reference
Unreal Engine Documentation: Tasks System