说明

本文为UE5 TaskGraph系统的下篇,主要对TaskGraph系统中数据组织形式、任务创建与分发的过程等进行简要剖析

上篇中对TaskGraph中主要的基本类型进行了简要介绍,请移步UE5-TaskGraph系统-上:基本类型

任务队列

Task Graph的任务队列主要有两种:

  • FNamedTaskThread拥有各自专属的任务队列Queues(FThreadTaskQueue类型数组),而RHIThread、GameThread、ActualRenderingThread都是FNamedTaskThread;
  • FTaskThreadAnyThread没有各自专属的任务队列,共用IncomingAnyThreadTasks队列(FStallingTaskQueue类型数组)

上述Queues中包含两个(ENamedThreads::NumQueues = 2)FThreadTaskQueue类型实例,分别代表MainQueue和LocalQueue,FThreadTaskQueue中拥有一个FStallingTaskQueue。
而IncomingAnyThreadTasks实际上是包含三个FStallingTaskQueue的数组(MAX_THREAD_PRIORITIES = 3,三个分别对应三种优先级的FTaskThreadAnyThread)

而其中FStallingTaskQueue类型的实例都包含两个LockFreeList,分别存储Normal, High两种优先级的任务。

任务线程

创建

FTaskThreadAnyThread在FTaskGraphImplementation构造时就创建FRunnableThread

FTaskGraphImplementation::FTaskGraphImplementation(int32)
{
    (...)

    for (int32 ThreadIndex = LastExternalThread + 1; ThreadIndex < NumThreads; ThreadIndex++)
    {
        (...)

        if (FForkProcessHelper::IsForkedMultithreadInstance() && GAllowTaskGraphForkMultithreading)
        {
            WorkerThreads[ThreadIndex].RunnableThread = FForkProcessHelper::CreateForkableThread(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity);
        }
        else
        {
            WorkerThreads[ThreadIndex].RunnableThread = FRunnableThread::Create(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity); 
        }
        
        WorkerThreads[ThreadIndex].bAttached = true;
    }
    UE::Trace::ThreadGroupEnd();
}

FNamedTaskThread不会在FTaskGraphImplementation构造时就创建FRunnableThread,它们真正执行Task的Thread由对应的模块创建,并各自调用FTaskGraphInterface::Get().AttachToThread(ENamedThreads::GameThread)和对应的Worker相关联。

如Render模块会在调用StartRenderingThread函数时获得FRHIThread单例并调用其Start函数创建对应的FRunnableThread,而在使用FRHIThread::Run启动RHI线程时候,会调用FTaskGraphInterface::AttachToThread将其与WorkerThreads中的RHIThread关联。

// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp

class FRHIThread : public FRunnable
{
public:
    FRunnableThread* Thread;

    FRHIThread()
        : Thread(nullptr)
    {
        check(IsInGameThread());
    }

    virtual bool Init(void) override
    {
        PRAGMA_DISABLE_DEPRECATION_WARNINGS
        GRHIThreadId = FPlatformTLS::GetCurrentThreadId();
        PRAGMA_ENABLE_DEPRECATION_WARNINGS
        return true;
    }

    virtual uint32 Run() override
    {
        LLM_SCOPE(ELLMTag::RHIMisc);

#if CSV_PROFILER
        FCsvProfiler::Get()->SetRHIThreadId(FPlatformTLS::GetCurrentThreadId());
#endif

        FMemory::SetupTLSCachesOnCurrentThread();
        {
            FTaskTagScope Scope(ETaskTag::ERhiThread);
            FPlatformProcess::SetupRHIThread();
            FTaskGraphInterface::Get().AttachToThread(ENamedThreads::RHIThread);
            FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(ENamedThreads::RHIThread);
        }
        FMemory::ClearAndDisableTLSCachesOnCurrentThread();
        return 0;
    }

    static FRHIThread& Get()
    {
        static FRHIThread Singleton;
        return Singleton;
    }

    void Start()
    {
        UE::Trace::ThreadGroupBegin(TEXT("Render"));
        Thread = FRunnableThread::Create(this, TEXT("RHIThread"), 512 * 1024, FPlatformAffinity::GetRHIThreadPriority(),
            FPlatformAffinity::GetRHIThreadMask(), FPlatformAffinity::GetRHIThreadFlags()
            );
        check(Thread);
        UE::Trace::ThreadGroupEnd();
    }
};

执行

FNamedTaskThread和FTaskThreadAnyThread在执行任务时,获取任务的方式有所不同。

FNamedTaskThread::ProcessTasksNamedThread先根据传入的QueueIndex从该线程专属的任务队列选择MainQueue或LocalQueue,然后根据优先级从对应的队列获得任务并执行;

而FTaskThreadAnyThread::ProcessTasks中需要调用FTaskGraphImplementation::FindWork,根据其线程索引计算出对应的优先级和任务索引,然后从IncomingAnyThreadTasks中获取任务。

WorkerThreads的组织形式

WorkerThreads先保存了RHIThread, GameThread, ActualRenderingThread,随后将FTaskThreadAnyThread均分(每组线程数为NumTaskThreadsPerSet)为三个优先级(皆低于Normal),TaskGraph系统用到的所有线程对象,其分布如下:

                                            WorkerThreads
|<------------------------------------------- NumThreads ----------------------------------------------->|
|    0    |    1     |          2          |NumTaskThreadsPerSet|NumTaskThreadsPerSet|NumTaskThreadsPerSet|
|RHIThread|GameThread|ActualRenderingThread|         NP         |         HP         |         BP        |
                                           |     BelowNormal    | SlightlyBelowNormal|    TPri_Lowest    |
                                                                 NumTaskThreadSets = 3

任务的分发

UE中任务种类繁多,在构建TGraphTask时,通过模板类型推导得到typename TTask的具体任务类型。

为了方便说明,不妨以FDeferredShadingSceneRenderer::Render中,从Dynamic MeshBatch生成MeshDrawCommand的过程为例,来分析其创建MeshDrawCommand任务是如何创建并分发入队的。

Review:

如UE Mesh Drawing Pipeline所述,创建并入队生成MeshDrawCommand的任务的调用过程如下:

FVisibilityTaskData::FinishGatherDynamicMeshElements --> FVisibilityTaskData::SetupMeshPasses --> ComputeDynamicMeshRelevance
至此完成PassMask.Set。

FVisibilityTaskData::SetupMeshPasses继续向下执行,会遍历每一个View,并调用FSceneRenderer::SetupMeshPass

在FSceneRenderer::SetupMeshPass中,遍历每种MeshPass,创建对应的FMeshPassProcessor,而后获取指定Pass的FParallelMeshDrawCommandPass对象,使用FParallelMeshDrawCommandPass::DispatchPassSetup并行地处理Pass,创建此Pass的FMeshDrawCommand。

而真正收集MeshDrawCommand的任务就是在FParallelMeshDrawCommandPass::DispatchPassSetup中完成创建并分发入队的。

// Engine\Source\Runtime\Renderer\Private\MeshDrawCommands.cpp

void FParallelMeshDrawCommandPass::DispatchPassSetup(
    FScene* Scene,
    const FViewInfo& View,
    FInstanceCullingContext&& InstanceCullingContext,
    EMeshPass::Type PassType,
    FExclusiveDepthStencil::Type BasePassDepthStencilAccess,
    FMeshPassProcessor* MeshPassProcessor,
    const TArray<FMeshBatchAndRelevance, SceneRenderingAllocator>& DynamicMeshElements,
    const TArray<FMeshPassMask, SceneRenderingAllocator>* DynamicMeshElementsPassRelevance,
    int32 NumDynamicMeshElements,
    TArray<const FStaticMeshBatch*, SceneRenderingAllocator>& InOutDynamicMeshCommandBuildRequests,
    TArray<EMeshDrawCommandCullingPayloadFlags, SceneRenderingAllocator> InOutDynamicMeshCommandBuildFlags,
    int32 NumDynamicMeshCommandBuildRequestElements,
    FMeshCommandOneFrameArray& InOutMeshDrawCommands,
    FMeshPassProcessor* MobileBasePassCSMMeshPassProcessor,
    FMeshCommandOneFrameArray* InOutMobileBasePassCSMMeshDrawCommands
)
{
    (...)

    if (MaxNumDraws > 0)
    {
        (...)

        if (bExecuteInParallel)
        {
            if (IsOnDemandShaderCreationEnabled())
            {
                TaskEventRef = TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext);
            }
            else
            {
                FGraphEventArray DependentGraphEvents;
                DependentGraphEvents.Add(TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext));
                TaskEventRef = TGraphTask<FMeshDrawCommandInitResourcesTask>::CreateTask(&DependentGraphEvents).ConstructAndDispatchWhenReady(TaskContext);
            }
        }
        else
        {
            (...)
        }

        // This work needs to be deferred until at least BuildRenderingCommands (to ensure the DynamicPrimitiveCollector is uploaded), so we use the async mechanism either way 
        auto FinalizeInstanceCullingSetup = [this, Scene](FInstanceCullingContext& InstanceCullingContext)
        {
            WaitForMeshPassSetupTask();

#if DO_CHECK
            for (const FVisibleMeshDrawCommand& VisibleMeshDrawCommand : TaskContext.MeshDrawCommands)
            {
                if (VisibleMeshDrawCommand.PrimitiveIdInfo.bIsDynamicPrimitive)
                {
                    uint32 PrimitiveIndex = VisibleMeshDrawCommand.PrimitiveIdInfo.DrawPrimitiveId & ~GPrimIDDynamicFlag;
                    TaskContext.View->DynamicPrimitiveCollector.CheckPrimitiveProcessed(PrimitiveIndex, Scene->GPUScene);
                }
            }
#endif
            InstanceCullingContext.SetDynamicPrimitiveInstanceOffsets(TaskContext.View->DynamicPrimitiveCollector.GetInstanceSceneDataOffset(), TaskContext.View->DynamicPrimitiveCollector.NumInstances());
        };
        TaskContext.InstanceCullingContext.BeginAsyncSetup(FinalizeInstanceCullingSetup);
    }
}

其中的关键在:

TaskEventRef = TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask()
                                                .ConstructAndDispatchWhenReady(TaskContext);

下面逐步分析:

任务创建

TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask()

template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
    (...)
    static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        FGraphEventRef GraphEvent = TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent();

        int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
        return FConstructor(new TGraphTask(MoveTemp(GraphEvent), NumPrereq), Prerequisites, CurrentThreadIfKnown);
    }
    (...)
}

首先,使用并行生成MeshDrawCommand的任务FMeshDrawCommandPassSetupTask,实例化TGraphTask模板,并调用TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask,其中会根据任务的SubsequentsMode来决定是否为其创建GraphEvent:如果任务的SubsequentsMode是ESubsequentsMode::FireAndForget,则没有后续任务依赖与当前任务,则无需处理后续任务,因此不需要GraphEvent;而如果是ESubsequentsMode::TrackSubsequents,则需要GraphEvent在该任务执行完毕后处理后续任务。

最终,得到TGraphTask对应的工厂类FConstructor的实例。

TGraphTask<FMeshDrawCommandPassSetupTask>::CreateTask().ConstructAndDispatchWhenReady(TaskContext)

template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
    class FConstructor
    {
    public:
        /** Passthrough internal task constructor and dispatch. Note! Generally speaking references will not pass through; use pointers */
        template<typename...T>
        FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
        {
            new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
            return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
        }
        (...)
    }
    (...)
    TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage;
    (...)
}

然后调用FConstructor的实例的ConstructAndDispatchWhenReady函数,传入TaskContext,实例化其函数模板,使用placement new,调用FMeshDrawCommandPassSetupTask的构造函数完成任务实例的构造,并将其存入以1字节为元素大小的数组(以FMeshDrawCommandPassSetupTask的大小对齐)TaskStorage中,最后调用TGraphTask<FMeshDrawCommandPassSetupTask>::Setup

注:FConstructor::ConstructAndDispatchWhenReady传入右值引用(万能引用)并进行完美转发,不过此例中TaskContext后续还另有他用,因此传入的TaskContext是左值引用。

FGraphEventRef TGraphTask::Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
    TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));

    FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
    SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
    return ReturnedEventRef;
}

void TGraphTask::SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
{
    checkThreadGraph(!TaskConstructed);
    TaskConstructed = true;
    TTask& Task = *(TTask*)&TaskStorage;
    SetThreadToExecuteOn(Task.GetDesiredThread());
    int32 AlreadyCompletedPrerequisites = 0;
    if (Prerequisites)
    {
        for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
        {
            FGraphEvent* Prerequisite = (*Prerequisites)[Index];
            if (Prerequisite == nullptr || !Prerequisite->AddSubsequent(this))
            {
                AlreadyCompletedPrerequisites++;
            }
        }
    }
    PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
}

TGraphTask<FMeshDrawCommandPassSetupTask>::Setup中嵌套调用TGraphTask<FMeshDrawCommandPassSetupTask>::SetupPrereqs,在其中对TaskConstructed赋值true后,当前任务才算创建完成。而后的操作则是检查其先序事件以及入队操作。

注:TGraphTask并非在其构造函数中完成其真正的任务创建过程,其构造函数创建出的对象只是用来构造其内部工厂类,并设置代表其任务完成的FGraphEvent(如果该任务有后继的话)。而此时TaskStorage还未填入真正负责执行任务的TTask对象。实际上后续由TGraphTask内部工厂类的模板函数接受可变模板参数并完成实际的TTask对象的构造,并填入TaskStorage中。最后调用TGraphTask::Setup对TGraphTask进行设置,在其中嵌套调用TGraphTask::SetupPrereqs,当执行完 ‘TaskConstructed = true’ 才算真正完成了任务的创建过程。

任务入队

前面所述的TGraphTask::SetupPrereqs 继续执行,对先序任务事件进行遍历(如果有先序任务事件的话)。
对于每个事件,若该事件为空,或该事件所拥有的记录着依赖于该事件的后继任务的队列已经关闭(意味着后继任务已完成),则给已完成的前序事件数AlreadyCompletedPrerequisites +1,最后调用TGraphTask::PrerequisitesComplete,并传入已完成的前序事件数。

TGraphTask::PrerequisitesComplete中先计算要执行当前任务所需要等待完成的前序事件数

注:NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub 这样进行判断是因为FThreadSafeCounter::Subtract底层调用的是_InterlockedExchangeAdd,这是个原子加操作,进行操作后,返回的是被操作数的原值。

其中,NumAlreadyFinishedPrequistes + (bUnlock ? 1 : 0); 是因为在FBaseGraphTask构造时会给前序事件数+1,以防止任务执行,如下所示:

FBaseGraphTask::FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding)
    : ThreadToExecuteOn(ENamedThreads::AnyThread)
    , NumberOfPrerequistitesOutstanding(InNumberOfPrerequistitesOutstanding + 1) // + 1 is not a prerequisite, it is a lock to prevent it from executing while it is getting prerequisites, one it is safe to execute, call PrerequisitesComplete
{
    checkThreadGraph(LifeStage.Increment() == int32(LS_Contructed));
    CaptureInheritedContext();
    }

前序任务已完成,立即入队

上述 ‘NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub’ 结果为true,则意味着前序任务已全部完成,可将当前创建的任务入队。

最后,FBaseGraphTask::QueueTask内调用FTaskGraphCompatibilityImplementation::QueueTask,根据传入的InThreadToExecuteOn决定将任务加入AnyThread或NamedThread中。

如果加入AnyThread,则需要根据InThreadToExecuteOn获得其线程优先级和任务优先级,并加入对应的队列;
如果加入NamedThread中,则需要根据InThreadToExecuteOn判断应该加入MainQueue还是LocalQueue,并进一步分为从本线程入队和从其他线程入队两种情况,并在其中获得任务优先级,加入相应的队列。

前序任务未完成,暂缓入队

对于某些有前序任务的任务,上述 ‘NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub’ 结果可能为false,意味着前序任务未全部完成,则当前创建的任务并不会被加入队列,那么该任务又是何时会再次尝试入队呢?

在前面TGraphTask::SetupPrereqs遍历统计先序任务事件时,会尝试将当前创建的任务加入先序任务事件的后继任务队列中。
而后,那些先序任务事件会在特定阶段调用FGraphEvent::DispatchSubsequents函数推动其先序任务执行,或当其先序任务已全部执行完毕时(即当前任务事件完成)尝试将其后继任务入队。

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

void FGraphEvent::DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown)
{
    TArray<FBaseGraphTask*> NewTasks;
    DispatchSubsequents(NewTasks, CurrentThreadIfKnown);
}

void FGraphEvent::DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown, bool bInternal/* = false */)
{
    (...)

    bool bWakeUpWorker = false;
    TArray<FBaseGraphTask*> PoppedTasks;
    SubsequentList.PopAllAndClose(PoppedTasks);
    for (FBaseGraphTask* NewTask : ReverseIterate(PoppedTasks)) // reverse the order since PopAll is implicitly backwards
    {
        checkThreadGraph(NewTask);
        NewTask->ConditionalQueueTask(CurrentThreadIfKnown, bWakeUpWorker);
    }

    (...)
}

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

void FBaseGraphTask::ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker)
{
    if (NumberOfPrerequistitesOutstanding.Decrement()==0)
    {
        QueueTask(CurrentThread, bWakeUpWorker);
        bWakeUpWorker = true;
    }
}

在FBaseGraphTask::ConditionalQueueTask会先将对应的任务的待执行前序任务事件计数NumberOfPrerequistitesOutstanding -1,并判断其是否为0。若为0,则可以将当前任务入队;若不为0,则意味着还有其他前序任务事件未完成,继续暂缓入队,当那些未完成的事件完成后,又会进行上述的流程尝试将该任务加入队列。

Reference

Unreal Engine Documentation: Tasks System

Unreal Engine Documentation: Task Graph Insights

剖析虚幻渲染体系(02)- 多线程渲染

UE4之TaskGraph系统