说明

本文为UE5 TaskGraph系统的上篇,主要对TaskGraph系统中重要的类型进行简要介绍与分析

下篇主要对TaskGraph中数据组织形式、任务创建与分发的过程等进行简要剖析。下篇现已更新,请移步UE5-TaskGraph系统-下:机制分析

概述

TaskGraph(任务图)是UE实现的一套异步任务并行处理系统。在定义任务的同时,可以指定任务的依赖关系,TaskGraph会按照编排好的依赖关系来运行任务。
任务开始运行前可以指定多个依赖的前置任务,只有前置任务运行结束,本任务才会开始运行。最终,所有任务依赖关系形成一张有向无环图(DAG)。

GraphTask 任务类型

FBaseGraphTask

FBaseGraphTask是运行于TaskGraph的任务,是基础父类,其派生的具体任务子类才会真正执行任务

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

class FBaseGraphTask : private UE::FInheritedContextBase
{
protected:
    FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding)
        : ThreadToExecuteOn(ENamedThreads::AnyThread)
        , NumberOfPrerequistitesOutstanding(InNumberOfPrerequistitesOutstanding + 1)
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_Contructed));
        CaptureInheritedContext();
    }

    void SetThreadToExecuteOn(ENamedThreads::Type InThreadToExecuteOn)
    {
        ThreadToExecuteOn = InThreadToExecuteOn;
        checkThreadGraph(LifeStage.Increment() == int32(LS_ThreadSet));
    }

    // 先决任务完成或部分地完成
    void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true)
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_PrequisitesSetup));
        int32 NumToSub = NumAlreadyFinishedPrequistes + (bUnlock ? 1 : 0); // the +1 is for the "lock" we set up in the constructor
        if (NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub) 
        {
            bool bWakeUpWorker = true;
            QueueTask(CurrentThread, bWakeUpWorker);
        }
    }

    /** destructor, just checks the life stage **/
    virtual ~FBaseGraphTask()
    {
#if DO_GUARD_SLOW
        int32 Stage = LifeStage.Increment();
        checkf(Stage == int32(LS_Deconstucted), TEXT("LifeStage was %d"), Stage);
#endif
    }

#if !(UE_BUILD_SHIPPING || UE_BUILD_TEST)
    /** Logs a task name that may contain invalid subsequents. Debug only. */
    static void CORE_API LogPossiblyInvalidSubsequentsTask(const TCHAR* TaskName);
#endif

    // 带条件(前置任务都已经执行完毕)地执行任务
    void ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker)
    {
        if (NumberOfPrerequistitesOutstanding.Decrement()==0)
        {
            QueueTask(CurrentThread, bWakeUpWorker);
            bWakeUpWorker = true;
        }
    }

    (...)

private:
    (...)

    // 真正地执行任务, 由子类实现
    virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)=0;

    virtual void DeleteTask() = 0;

    // Just checks the life stage and passes off to the virtual ExecuteTask method
    FORCEINLINE void Execute(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_Executing));

        UE::FInheritedContextScope InheritedContextScope = RestoreInheritedContext();
        ExecuteTask(NewTasks, CurrentThread, bDeleteOnCompletion);
    }

    // 加入到TaskGraph任务队列中
    void QueueTask(ENamedThreads::Type CurrentThreadIfKnown, bool bWakeUpWorker)
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_Queued));
        TaskTrace::Scheduled(GetTraceId());
        FTaskGraphInterface::Get().QueueTask(this, bWakeUpWorker, ThreadToExecuteOn, CurrentThreadIfKnown);
    }

    ENamedThreads::Type         ThreadToExecuteOn; // 执行任务的线程类型
    FThreadSafeCounter          NumberOfPrerequistitesOutstanding; // 执行任务前的计数器


#if DO_GUARD_SLOW || USING_CODE_ANALYSIS
    // Life stage verification
    // Tasks go through 8 steps, in order. In non-final builds, we track them with a thread safe counter and verify that the progression is correct.
    enum ELifeStage
    {
        LS_BaseContructed = 0,
        LS_Contructed,
        LS_ThreadSet,
        LS_PrequisitesSetup,
        LS_Queued,
        LS_Executing,
        LS_Deconstucted,
    };
    FThreadSafeCounter          LifeStage;

#endif

#if UE_TASK_TRACE_ENABLED
    std::atomic<TaskTrace::FId> TraceId { TaskTrace::InvalidId };
#endif
};

TGraphTask

TGraphTask是FBaseGraphTask的唯一子类,实现了接口,最主要的是实现了执行任务的函数。

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
    // 构造任务的工厂类
    class FConstructor
    {
    public:
        // 创建TTask任务对象, 然后设置TGraphTask任务的数据, 以便在适当时机执行
        template<typename...T>
        FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
        {
            new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
            return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
        }

        // 创建TTask任务对象, 然后设置TGraphTask任务的数据, 并持有但不执行
        template<typename...T>
        TGraphTask* ConstructAndHold(T&&... Args)
        {
            new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
            return Owner->Hold(Prerequisites, CurrentThreadIfKnown);
        }

    private:
        friend TGraphTask;

        TGraphTask*                     Owner; // 所属的TGraphTask对象
        const FGraphEventArray*         Prerequisites; // 先决任务
        ENamedThreads::Type             CurrentThreadIfKnown;

        FConstructor(TGraphTask* InOwner, const FGraphEventArray* InPrerequisites, ENamedThreads::Type InCurrentThreadIfKnown)
            : Owner(InOwner)
            , Prerequisites(InPrerequisites)
            , CurrentThreadIfKnown(InCurrentThreadIfKnown)
        {
        }

        // 禁用拷贝构造函数和拷贝赋值运算符
        FConstructor(const FConstructor& Other)
        {
            check(0);
        }

        void operator=(const FConstructor& Other)
        {
            check(0);
        }
    };

    // 创建任务, 注意返回的是FConstructor对象, 以便对任务执行后续操作
    static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        FGraphEventRef GraphEvent = TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent();

        int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
        return FConstructor(new TGraphTask(MoveTemp(GraphEvent), NumPrereq), Prerequisites, CurrentThreadIfKnown);
    }

    void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));

        bool bWakeUpWorker = true;
        ConditionalQueueTask(CurrentThreadIfKnown, bWakeUpWorker);
    }

    FGraphEventRef GetCompletionEvent()
    {
        return Subsequents;
    }

    private:
    friend class FConstructor;
    friend class FGraphEvent;

    // 执行任务
    void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion) override
    {
        (...)

        if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
        {
            Subsequents->CheckDontCompleteUntilIsEmpty(); // we can only add wait for tasks while executing the task
        }
        // 执行任务
        TTask& Task = *(TTask*)&TaskStorage;
        {
            TaskTrace::FTaskTimingEventScope TaskEventScope(GetTraceId());
            FScopeCycleCounter Scope(Task.GetStatId(), true);
            Task.DoTask(CurrentThread, Subsequents);
            Task.~TTask();
            checkThreadGraph(ENamedThreads::GetThreadIndex(CurrentThread) <= ENamedThreads::GetRenderThread() || FMemStack::Get().IsEmpty()); // you must mark and pop memstacks if you use them in tasks! Named threads are excepted.
        }
        
        TaskConstructed = false;

        // 执行后序任务
        if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
        {
            FPlatformMisc::MemoryBarrier();
            Subsequents->DispatchSubsequents(NewTasks, CurrentThread, true);
        }
        else
        {
            // "fire and forget" tasks don't have an accompanying FGraphEvent that traces completion and destruction
            TaskTrace::Completed(GetTraceId());
            TaskTrace::Destroyed(GetTraceId());
        }

        // 释放任务对象数据
        if (bDeleteOnCompletion)
        {
            DeleteTask();
        }
    }

    void DeleteTask() final override
    {
        delete this;
    }

    // 构造函数(私有)
    TGraphTask(FGraphEventRef InSubsequents, int32 NumberOfPrerequistitesOutstanding)
        : FBaseGraphTask(NumberOfPrerequistitesOutstanding)
        , TaskConstructed(false)
    {
        Subsequents.Swap(InSubsequents);
        SetTraceId(Subsequents.IsValid() ? Subsequents->GetTraceId() : TaskTrace::GenerateTaskId());
    }

    // 析构函数(私有),检查任务是否完成
    ~TGraphTask() override
    {
        checkThreadGraph(!TaskConstructed);
    }

    // 设置先决任务
    void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
    {
        checkThreadGraph(!TaskConstructed);
        TaskConstructed = true;
        TTask& Task = *(TTask*)&TaskStorage;
        SetThreadToExecuteOn(Task.GetDesiredThread());
        int32 AlreadyCompletedPrerequisites = 0;
        if (Prerequisites)
        {
            for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
            {
                FGraphEvent* Prerequisite = (*Prerequisites)[Index];
                if (Prerequisite == nullptr || !Prerequisite->AddSubsequent(this))
                {
                    AlreadyCompletedPrerequisites++;
                }
            }
        }
        PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
    }

    // 设置任务数据
    FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));

        FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
        SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
        return ReturnedEventRef;
    }

    // 持有任务数据
    TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        TaskTrace::Created(GetTraceId(), sizeof(*this));

        SetupPrereqs(Prerequisites, CurrentThreadIfKnown, false);
        return this;
    }

    // 创建任务
    static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        return FConstructor(new TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
    }

    TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 被执行的任务对象
    bool                        TaskConstructed;
    FGraphEventRef              Subsequents; // 后续任务同步对象
};

在ExecuteTask函数中,调用了TTask::DoTask函数,也就是说实际执行任务的是TTask模板类。

自定义任务可以参考官方注释中的class FGenericTask,也可以直接派生TAsyncGraphTask类,TAsyncGraphTask类派生自任务基类FAsyncGraphTaskBase

ESubsequentsMode

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

namespace ESubsequentsMode
{
    enum Type
    {
        // 会被其他任务依赖
        TrackSubsequents,
        // 仅执行,不会被其他任务依赖
        FireAndForget
    };
}

TaskThread 任务线程类型

FTaskThreadBase

FTaskThreadBase是执行任务的线程父类,定义了一组设置、操作任务的接口,如初始化、帧更新、入队任务、唤醒线程、请求退出等。

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
    FTaskThreadBase()
        : ThreadId(ENamedThreads::AnyThread)
        , PerThreadIDTLSSlot(FPlatformTLS::InvalidTlsSlot)
        , OwnerWorker(nullptr)
    {
        NewTasks.Reset(128);
    }

    // 设置数据
    void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker);

    // 从当前线程初始化
    void InitializeForCurrentThread();

    ENamedThreads::Type GetThreadId() const; // 返回ThreadId

    // 用于NamedThread处理任务直到线程空闲或RequestQuit被调用
    virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;
    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);

    // 入队任务, 假设this线程和当前线程一样。如果是NamedThread, 会直接进入私有的队列
    virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);

    // 入队任务, 假设this线程和当前线程不一样
    virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);

    // 请求退出。会导致线程空闲时退出到调用者。如果是NamedThread,在ProcessTasksUntilQuit中用以返回给调用者;AnyThread则直接关闭
    virtual void RequestQuit(int32 QueueIndex) = 0;

    // 唤醒线程
    virtual void WakeUp(int32 QueueIndex = 0) = 0;

    // 查询任务是否在处理中
    virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

    // 单线程Tick
    virtual void Tick() override;

    virtual bool Init() override;

    virtual uint32 Run() override;

    virtual void Stop() override
    {
        RequestQuit(-1);
    }

    virtual void Exit() override
    {
    }

    virtual FSingleThreadRunnable* GetSingleThreadInterface() override
    {
        return this;
    }

protected:
    ENamedThreads::Type         ThreadId; // 线程ID
    uint32                      PerThreadIDTLSSlot; // TLS Slot
    FThreadSafeCounter          IsStalled; //  阻塞计数器,用于触发阻塞信号
    TArray<FBaseGraphTask*>     NewTasks; // 待处理任务列表
    FWorkerThread*              OwnerWorker; // 所属的工作线程
};

FNamedTaskThread & FTaskThreadAnyThread

FTaskThreadBase只是抽象类,具体的实现由子类FNamedTaskThread和FTaskThreadAnyThread完成,前者是带名字的任务线程,后者是无名的任务线程。

两者在处理任务时的操作有所不同。FNamedTaskThread::ProcessTasksNamedThread从对应的任务队列的队首获得任务并执行,而FTaskThreadAnyThread::ProcessTasks中需要调用FTaskGraphImplementation::FindWork来获取任务。

FNamedTaskThread

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

class FNamedTaskThread : public FTaskThreadBase
{
public:
    // NamedThread处理任务直到线程空闲或RequestQuit被调用
    virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
    {
        check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

        Queue(QueueIndex).QuitForReturn = false;
        verify(++Queue(QueueIndex).RecursionGuard == 1);
        const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
        // 循环处理队列任务, 直到退出、关闭或平台不支持多线程
        do
        {
            const bool bAllowStall = bIsMultiThread;
            ProcessTasksNamedThread(QueueIndex, bAllowStall);
        } while (!Queue(QueueIndex).QuitForReturn && !Queue(QueueIndex).QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
        verify(!--Queue(QueueIndex).RecursionGuard);
    }

    // NamedThread处理任务直到线程空闲或RequestQuit被调用
    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
    {
        check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

        Queue(QueueIndex).QuitForReturn = false;
        verify(++Queue(QueueIndex).RecursionGuard == 1);
        uint64 ProcessedTasks = ProcessTasksNamedThread(QueueIndex, false);
        verify(!--Queue(QueueIndex).RecursionGuard);
        return ProcessedTasks;
    }

    // NamedThread处理任务
    uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall)
    {
        (...)

        while (!Queue(QueueIndex).QuitForReturn)
        {
            const bool bIsRenderThreadAndPolling = bIsRenderThreadMainQueue && (GRenderThreadPollPeriodMs >= 0);
            const bool bStallQueueAllowStall = bAllowStall && !bIsRenderThreadAndPolling;

            // 从队列首部获取任务
            FBaseGraphTask* Task = Queue(QueueIndex).StallQueue.Pop(0, bStallQueueAllowStall);
            TestRandomizedThreads();
            if (!Task)
            {
                (...)
            }
            else // 任务不为空
            {
                // 执行任务
                Task->Execute(NewTasks, ENamedThreads::Type(ThreadId | (QueueIndex << ENamedThreads::QueueIndexShift)), true);
                ProcessedTasks++;
                TestRandomizedThreads();
            }
        }
        (...)
        return ProcessedTasks;
    }

    virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task) override;
    virtual void RequestQuit(int32 QueueIndex) override;
    virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task) override;
    virtual bool IsProcessingTasks(int32 QueueIndex) override;
    virtual void WakeUp(int32 QueueIndex) override;

private:
    (...)

    // 线程任务队列
    struct FThreadTaskQueue
    {
        FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue; // 阻塞的任务队列
        uint32 RecursionGuard; // 防止循环(递归)调用
        bool QuitForReturn; // 是否请求退出
        bool QuitForShutdown; // 是否请求关闭
        FEvent* StallRestartEvent; // 当线程满载时的阻塞事件

        FThreadTaskQueue()
            : RecursionGuard(0)
            , QuitForReturn(false)
            , QuitForShutdown(false)
            , StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
        {
        }
        ~FThreadTaskQueue()
        {
            FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
            StallRestartEvent = nullptr;
        }
    };

    FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex)
    {
        checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
        return Queues[QueueIndex];
    }
    FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const
    {
        checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
        return Queues[QueueIndex];
    }

    FThreadTaskQueue Queues[ENamedThreads::NumQueues]; // NamedThread专用的任务队列
};

FTaskThreadAnyThread

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
    FTaskThreadAnyThread(int32 InPriorityIndex)
        : PriorityIndex(InPriorityIndex)
    {
    }

    virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
    {
        if (PriorityIndex != (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift))
        {
            FMemory::SetupTLSCachesOnCurrentThread();
        }
        check(!QueueIndex);
        const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
        do
        {
            ProcessTasks();
        } while (!Queue.QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
    }

    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
    {
        if (FTaskGraphInterface::IsMultithread() == false)
        {
            return ProcessTasks();
        }
        else
        {
            check(0);
            return 0;
        }
    }

    (...)

private:

#if UE_EXTERNAL_PROFILING_ENABLED
    static inline const TCHAR* ThreadPriorityToName(int32 PriorityIdx)
    {
        PriorityIdx <<= ENamedThreads::ThreadPriorityShift;
        if (PriorityIdx == ENamedThreads::HighThreadPriority)
        {
            return TEXT("Task Thread HP"); // 高优先级的工作线程
        }
        else if (PriorityIdx == ENamedThreads::NormalThreadPriority)
        {
            return TEXT("Task Thread NP"); // 普通优先级的工作线程
        }
        else if (PriorityIdx == ENamedThreads::BackgroundThreadPriority)
        {
            return TEXT("Task Thread BP"); // 后台优先级的工作线程
        }
        else
        {
            return TEXT("Task Thread Unknown Priority");
        }
    }
#endif

    // 处理任务,与NamedTaskThread做法不同,主要是从TaskGraph系统中的无名任务队列获取任务
    uint64 ProcessTasks()
    {
        (...)

        while (1)
        {
            // 从TaskGraph系统中的无名任务队列获取任务的
            FBaseGraphTask* Task = FindWork();
            if (!Task)
            {
                (...)

                TestRandomizedThreads();
                const bool bIsMultithread = FTaskGraphInterface::IsMultithread();
                if (bIsMultithread)
                {
                    FScopeCycleCounter Scope(StallStatId, EStatFlags::Verbose);
                    Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall);
                    bDidStall = true;
                }
                if (Queue.QuitForShutdown || !bIsMultithread)
                {
                    break;
                }
                TestRandomizedThreads();

                (...)

                continue;
            }
            TestRandomizedThreads();

            (...)

            bDidStall = false;
            Task->Execute(NewTasks, ENamedThreads::Type(ThreadId), true);
            ProcessedTasks++;
            TestRandomizedThreads();
            if (Queue.bStallForTuning)
            {
                (...)
                {
                    FScopeLock Lock(&Queue.StallForTuning);
                }
                (...)
            }
        }
        verify(!--Queue.RecursionGuard);
        return ProcessedTasks;
    }

    // 任务队列数据
    struct FThreadTaskQueue
    {
        FEvent* StallRestartEvent; // 当线程满载时的阻塞事件
        uint32 RecursionGuard; // 防止循环(递归)调用
        bool QuitForShutdown; // 是否请求关闭
        bool bStallForTuning;
        FCriticalSection StallForTuning; // 阻塞临界区

        FThreadTaskQueue()
            : StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
            , RecursionGuard(0)
            , QuitForShutdown(false)
            , bStallForTuning(false)
        {

        }
        ~FThreadTaskQueue()
        {
            FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
            StallRestartEvent = nullptr;
        }
    };

    // 从TaskGraph系统中获取任务
    FBaseGraphTask* FindWork();

    // 任务队列,只有第一个用于无名线程
    FThreadTaskQueue Queue;

    int32 PriorityIndex;
};

ENamedThreads

namespace ENamedThreads中提供了线程类型枚举以及设置和获取渲染线程、线程索引、线程优先级、任务优先级的接口

ENamedThreads

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

namespace ENamedThreads
{
    enum Type : int32
    {
        UnusedAnchor = -1,
        /** The always-present, named threads are listed next **/
        RHIThread,
        GameThread,
        // The render thread is sometimes the game thread and is sometimes the actual rendering thread
        ActualRenderingThread = GameThread + 1,
        // CAUTION ThreadedRenderingThread must be the last named thread, insert new named threads before it

        /** not actually a thread index. Means "Unknown Thread" or "Any Unnamed Thread" **/
        AnyThread = 0xff, 

        /** High bits are used for a queue index and priority**/

        MainQueue =         0x000,
        LocalQueue =        0x100,

        NumQueues =         2,
        ThreadIndexMask =   0xff,
        QueueIndexMask =    0x100,
        QueueIndexShift =   8,

        /** High bits are used for a queue index task priority and thread priority**/

        NormalTaskPriority =    0x000,
        HighTaskPriority =      0x200,

        NumTaskPriorities =     2,
        TaskPriorityMask =      0x200,
        TaskPriorityShift =     9,

        NormalThreadPriority = 0x000,
        HighThreadPriority = 0x400,
        BackgroundThreadPriority = 0x800,

        NumThreadPriorities = 3,
        ThreadPriorityMask = 0xC00,
        ThreadPriorityShift = 10,

        /** Combinations **/
        GameThread_Local = GameThread | LocalQueue,
        ActualRenderingThread_Local = ActualRenderingThread | LocalQueue,

        AnyHiPriThreadNormalTask = AnyThread | HighThreadPriority | NormalTaskPriority,
        AnyHiPriThreadHiPriTask = AnyThread | HighThreadPriority | HighTaskPriority,

        AnyNormalThreadNormalTask = AnyThread | NormalThreadPriority | NormalTaskPriority,
        AnyNormalThreadHiPriTask = AnyThread | NormalThreadPriority | HighTaskPriority,

        AnyBackgroundThreadNormalTask = AnyThread | BackgroundThreadPriority | NormalTaskPriority,
        AnyBackgroundHiPriTask = AnyThread | BackgroundThreadPriority | HighTaskPriority,
    };

    (...)

}

TaskGraph 任务图类型

TaskGraph会根据线程优先级、是否启用后台线程创建不同的工作线程集合,然后创建它们的FWorkerThread对象。
入队任务时,会将任务Push到任务列表IncomingAnyThreadTasks中,并取出可执行的任务索引,根据任务的属性(希望在哪个线程执行、优先级、任务索引)启用对应的工作线程去执行。

FTaskGraphInterface

FTaskGraphInterface就是任务图的管理器和工厂,提供了任务操作、线程操作的接口。

FTaskGraphImplementation

FTaskGraphImplementation继承并实现了FTaskGraphInterface的接口。FTaskGraphImplementation采用了特殊的线程对象WorkerThreads(工作线程)来作为执行的载体,如果是专用的线程(带名字的线程,如GameThread、RHI、ActualRenderingThread),则会进入专用的任务队列。

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

class FTaskGraphImplementation final : public FTaskGraphInterface
{
public:
    // 获取FTaskGraphImplementation单例
    static FTaskGraphImplementation& Get()
    {
        checkThreadGraph(!GUseNewTaskBackend);
        checkThreadGraph(TaskGraphImplementationSingleton);
        return *static_cast<FTaskGraphImplementation*>(TaskGraphImplementationSingleton);
    }

    // 构造函数, 计算任务线程数量, 创建专用线程和无名线程等
    FTaskGraphImplementation(int32);

    virtual ~FTaskGraphImplementation()

    // 入队任务,AnyThread和NamedTaskThread分别处理
    // AnyThread的任务会加入IncomingAnyThreadTasks对应优先级的队列
    // NamedTaskThread的任务会根据优先级,加入各自所有的FThreadTaskQueue的对应队列
    virtual void QueueTask(FBaseGraphTask* Task, bool bWakeUpWorker, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override;

    virtual int32 GetNumWorkerThreads() final override;
    virtual int32 GetNumForegroundThreads() final override;
    virtual int32 GetNumBackgroundThreads() final override;
    virtual bool IsCurrentThreadKnown() final override;
    virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override;
    virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override;

    // 将当前线程导入到指定Index
    virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override;

    // ----处理任务接口----
    virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override;
    virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override;
    virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override;
    virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
    virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override;
    virtual void AddShutdownCallback(TFunction<void()>& Callback);

    // ----任务调度接口----
    virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) override;
    void StartTaskThread(int32 Priority, int32 IndexToStart);
    void StartAllTaskThreads(bool bDoBackgroundThreads);
    FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed) override;
    void StallForTuning(int32 Index, bool Stall) override;
    void SetTaskThreadPriorities(EThreadPriority Pri);

private:
    // 获取指定索引的任务线程引用
    FTaskThreadBase& Thread(int32 Index);

    // 获取当前线程索引
    ENamedThreads::Type GetCurrentThread();
    int32 ThreadIndexToPriorityIndex(int32 ThreadIndex);

    enum
    {
        /** Compile time maximum number of threads. Didn't really need to be a compile time constant, but task thread are limited by MAX_LOCK_FREE_LINKS_AS_BITS **/
        MAX_THREADS = 0xFFFF,
        MAX_THREAD_PRIORITIES = 3
    };

    FWorkerThread       WorkerThreads[MAX_THREADS]; // 所有工作线程(任务线程)对象数组
    int32               NumThreads; // 实际被使用的线程数量
    int32               NumNamedThreads; // 专用线程数量
    int32               NumTaskThreadSets; // 任务线程集合数量
    int32               NumTaskThreadsPerSet; // 每个集合拥有的任务线程数量

    bool                bCreatedHiPriorityThreads;
    bool                bCreatedBackgroundPriorityThreads;

    ENamedThreads::Type LastExternalThread;
    FThreadSafeCounter  ReentrancyCheck;
    uint32              PerThreadIDTLSSlot;

    TArray<TFunction<void()> > ShutdownCallbacks; // 销毁前的回调

    FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES];
};


struct FWorkerThread
{
    FTaskThreadBase*    TaskGraphWorker; // 所属的FTaskThread对象
    FRunnableThread*    RunnableThread; // 真正执行任务的可运行线程
    bool                bAttached; // 是否附加的线程(一般用于专用线程)

    /** Constructor to set reasonable defaults. **/
    FWorkerThread()
        : TaskGraphWorker(nullptr)
        , RunnableThread(nullptr)
        , bAttached(false)
    {
    }
};

WorkerThreads保存TaskGraph系统用到所有的线程对象,分布如下图

WorkerThreadsLayout

GraphEvent 任务图事件类型

FGraphEvent

FGraphEvent表示一个Task完成的事件,FGraphEvent实现了Task之间的依赖关系,只有Task依赖的所有前置Task执行完成,当前Task才会被加入到队列中。所以FGraphEvent总是和一个Task相关,它也是在一个Task初始化的时候创建的。
在一个Task执行完成之后,与其相关的Event就算完成了,于是Event就会处理所有依赖于自己的后续Task。

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

class FGraphEvent 
{
public:
    static CORE_API FGraphEventRef CreateGraphEvent();

    // 参数FBaseGraphTask* Task需要当前FGraphEvent作为其前置任务事件时,需要将Task添加进当前FGraphEvent的链表中
    // 如果当前Event已经触发,则添加失败,返回false;添加成功,则返回true
    bool AddSubsequent(class FBaseGraphTask* Subsequent);

    // 检查前序任务事件数是否为0
    void CheckDontCompleteUntilIsEmpty();


    // 将EventsToWaitFor加入Event的前序任务事件列表
    void DontCompleteUntil(FGraphEventRef EventToWaitFor);


    // 当前Event未完成时,推动它依赖的未完成前置任务执行。当前Event完成后,则推动依赖该Event的任务执行
    CORE_API void DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
    CORE_API void DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, bool bInternal = false);

    // 当前任务事件是否完成
    bool IsComplete() const
    {
        return SubsequentList.IsClosed();
    }

    // 等待直到当前Event的任务完成
    void Wait(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);

    (...)

private:
    friend class TRefCountPtr<FGraphEvent>;
    friend class TLockFreeClassAllocator_TLSCache<FGraphEvent, PLATFORM_CACHE_LINE_SIZE>;

    // 释放ToRecycle的内存
    static CORE_API void Recycle(FGraphEvent* ToRecycle);

    friend struct FGraphEventAndSmallTaskStorage;

    /**
        * Hidden Constructor
    **/
    FGraphEvent()
        : ThreadToDoGatherOn(ENamedThreads::AnyHiPriThreadHiPriTask)
    {
    }

    /**
        * Destructor. Verifies we aren't destroying it prematurely. 
    **/
    ~FGraphEvent();

    // Interface for TRefCountPtr

public:
    // 引用计数+1
    uint32 AddRef();
    // 引用计数-1
    uint32 Release();
    // 返回引用计数
    uint32 GetRefCount() const;

private:
    // 后继任务列表
    TClosableLockFreePointerListUnorderedSingleConsumer<FBaseGraphTask, 0>  SubsequentList;
    // 前序任务事件列表
    FGraphEventArray                                                        EventsToWaitFor;
    // 引用计数
    FThreadSafeCounter                                                      ReferenceCount;
    // 用于推动执行未完成的前序任务的FNullGraphTask所线程
    ENamedThreads::Type                                                     ThreadToDoGatherOn;

    (...)
};

FGraphEventRef

指向FGraphEvent类型的带引用计数的指针

// Engine\Source\Runtime\Core\Public\Async\TaskGraphFwd.h

using FGraphEventRef = TRefCountPtr<class FGraphEvent>;

Reference

Unreal Engine Documentation: Tasks System

Unreal Engine Documentation: Task Graph Insights

剖析虚幻渲染体系(02)- 多线程渲染

UE4之TaskGraph系统