说明
本文为UE5 TaskGraph系统的上篇,主要对TaskGraph系统中重要的类型进行简要介绍与分析
下篇主要对TaskGraph中数据组织形式、任务创建与分发的过程等进行简要剖析。下篇现已更新,请移步UE5-TaskGraph系统-下:机制分析
概述
TaskGraph(任务图)是UE实现的一套异步任务并行处理系统。在定义任务的同时,可以指定任务的依赖关系,TaskGraph会按照编排好的依赖关系来运行任务。
任务开始运行前可以指定多个依赖的前置任务,只有前置任务运行结束,本任务才会开始运行。最终,所有任务依赖关系形成一张有向无环图(DAG)。
GraphTask 任务类型
FBaseGraphTask
FBaseGraphTask是运行于TaskGraph的任务,是基础父类,其派生的具体任务子类才会真正执行任务
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
class FBaseGraphTask : private UE::FInheritedContextBase
{
protected:
FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding)
: ThreadToExecuteOn(ENamedThreads::AnyThread)
, NumberOfPrerequistitesOutstanding(InNumberOfPrerequistitesOutstanding + 1)
{
checkThreadGraph(LifeStage.Increment() == int32(LS_Contructed));
CaptureInheritedContext();
}
void SetThreadToExecuteOn(ENamedThreads::Type InThreadToExecuteOn)
{
ThreadToExecuteOn = InThreadToExecuteOn;
checkThreadGraph(LifeStage.Increment() == int32(LS_ThreadSet));
}
// 先决任务完成或部分地完成
void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true)
{
checkThreadGraph(LifeStage.Increment() == int32(LS_PrequisitesSetup));
int32 NumToSub = NumAlreadyFinishedPrequistes + (bUnlock ? 1 : 0); // the +1 is for the "lock" we set up in the constructor
if (NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub)
{
bool bWakeUpWorker = true;
QueueTask(CurrentThread, bWakeUpWorker);
}
}
/** destructor, just checks the life stage **/
virtual ~FBaseGraphTask()
{
#if DO_GUARD_SLOW
int32 Stage = LifeStage.Increment();
checkf(Stage == int32(LS_Deconstucted), TEXT("LifeStage was %d"), Stage);
#endif
}
#if !(UE_BUILD_SHIPPING || UE_BUILD_TEST)
/** Logs a task name that may contain invalid subsequents. Debug only. */
static void CORE_API LogPossiblyInvalidSubsequentsTask(const TCHAR* TaskName);
#endif
// 带条件(前置任务都已经执行完毕)地执行任务
void ConditionalQueueTask(ENamedThreads::Type CurrentThread, bool& bWakeUpWorker)
{
if (NumberOfPrerequistitesOutstanding.Decrement()==0)
{
QueueTask(CurrentThread, bWakeUpWorker);
bWakeUpWorker = true;
}
}
(...)
private:
(...)
// 真正地执行任务, 由子类实现
virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)=0;
virtual void DeleteTask() = 0;
// Just checks the life stage and passes off to the virtual ExecuteTask method
FORCEINLINE void Execute(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion)
{
checkThreadGraph(LifeStage.Increment() == int32(LS_Executing));
UE::FInheritedContextScope InheritedContextScope = RestoreInheritedContext();
ExecuteTask(NewTasks, CurrentThread, bDeleteOnCompletion);
}
// 加入到TaskGraph任务队列中
void QueueTask(ENamedThreads::Type CurrentThreadIfKnown, bool bWakeUpWorker)
{
checkThreadGraph(LifeStage.Increment() == int32(LS_Queued));
TaskTrace::Scheduled(GetTraceId());
FTaskGraphInterface::Get().QueueTask(this, bWakeUpWorker, ThreadToExecuteOn, CurrentThreadIfKnown);
}
ENamedThreads::Type ThreadToExecuteOn; // 执行任务的线程类型
FThreadSafeCounter NumberOfPrerequistitesOutstanding; // 执行任务前的计数器
#if DO_GUARD_SLOW || USING_CODE_ANALYSIS
// Life stage verification
// Tasks go through 8 steps, in order. In non-final builds, we track them with a thread safe counter and verify that the progression is correct.
enum ELifeStage
{
LS_BaseContructed = 0,
LS_Contructed,
LS_ThreadSet,
LS_PrequisitesSetup,
LS_Queued,
LS_Executing,
LS_Deconstucted,
};
FThreadSafeCounter LifeStage;
#endif
#if UE_TASK_TRACE_ENABLED
std::atomic<TaskTrace::FId> TraceId { TaskTrace::InvalidId };
#endif
};
TGraphTask
TGraphTask是FBaseGraphTask的唯一子类,实现了接口,最主要的是实现了执行任务的函数。
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
template<typename TTask>
class TGraphTask final : public TConcurrentLinearObject<TGraphTask<TTask>, FTaskGraphBlockAllocationTag>, public FBaseGraphTask
{
public:
// 构造任务的工厂类
class FConstructor
{
public:
// 创建TTask任务对象, 然后设置TGraphTask任务的数据, 以便在适当时机执行
template<typename...T>
FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
{
new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
}
// 创建TTask任务对象, 然后设置TGraphTask任务的数据, 并持有但不执行
template<typename...T>
TGraphTask* ConstructAndHold(T&&... Args)
{
new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
return Owner->Hold(Prerequisites, CurrentThreadIfKnown);
}
private:
friend TGraphTask;
TGraphTask* Owner; // 所属的TGraphTask对象
const FGraphEventArray* Prerequisites; // 先决任务
ENamedThreads::Type CurrentThreadIfKnown;
FConstructor(TGraphTask* InOwner, const FGraphEventArray* InPrerequisites, ENamedThreads::Type InCurrentThreadIfKnown)
: Owner(InOwner)
, Prerequisites(InPrerequisites)
, CurrentThreadIfKnown(InCurrentThreadIfKnown)
{
}
// 禁用拷贝构造函数和拷贝赋值运算符
FConstructor(const FConstructor& Other)
{
check(0);
}
void operator=(const FConstructor& Other)
{
check(0);
}
};
// 创建任务, 注意返回的是FConstructor对象, 以便对任务执行后续操作
static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
FGraphEventRef GraphEvent = TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent();
int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
return FConstructor(new TGraphTask(MoveTemp(GraphEvent), NumPrereq), Prerequisites, CurrentThreadIfKnown);
}
void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));
bool bWakeUpWorker = true;
ConditionalQueueTask(CurrentThreadIfKnown, bWakeUpWorker);
}
FGraphEventRef GetCompletionEvent()
{
return Subsequents;
}
private:
friend class FConstructor;
friend class FGraphEvent;
// 执行任务
void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread, bool bDeleteOnCompletion) override
{
(...)
if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
{
Subsequents->CheckDontCompleteUntilIsEmpty(); // we can only add wait for tasks while executing the task
}
// 执行任务
TTask& Task = *(TTask*)&TaskStorage;
{
TaskTrace::FTaskTimingEventScope TaskEventScope(GetTraceId());
FScopeCycleCounter Scope(Task.GetStatId(), true);
Task.DoTask(CurrentThread, Subsequents);
Task.~TTask();
checkThreadGraph(ENamedThreads::GetThreadIndex(CurrentThread) <= ENamedThreads::GetRenderThread() || FMemStack::Get().IsEmpty()); // you must mark and pop memstacks if you use them in tasks! Named threads are excepted.
}
TaskConstructed = false;
// 执行后序任务
if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
{
FPlatformMisc::MemoryBarrier();
Subsequents->DispatchSubsequents(NewTasks, CurrentThread, true);
}
else
{
// "fire and forget" tasks don't have an accompanying FGraphEvent that traces completion and destruction
TaskTrace::Completed(GetTraceId());
TaskTrace::Destroyed(GetTraceId());
}
// 释放任务对象数据
if (bDeleteOnCompletion)
{
DeleteTask();
}
}
void DeleteTask() final override
{
delete this;
}
// 构造函数(私有)
TGraphTask(FGraphEventRef InSubsequents, int32 NumberOfPrerequistitesOutstanding)
: FBaseGraphTask(NumberOfPrerequistitesOutstanding)
, TaskConstructed(false)
{
Subsequents.Swap(InSubsequents);
SetTraceId(Subsequents.IsValid() ? Subsequents->GetTraceId() : TaskTrace::GenerateTaskId());
}
// 析构函数(私有),检查任务是否完成
~TGraphTask() override
{
checkThreadGraph(!TaskConstructed);
}
// 设置先决任务
void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
{
checkThreadGraph(!TaskConstructed);
TaskConstructed = true;
TTask& Task = *(TTask*)&TaskStorage;
SetThreadToExecuteOn(Task.GetDesiredThread());
int32 AlreadyCompletedPrerequisites = 0;
if (Prerequisites)
{
for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
{
FGraphEvent* Prerequisite = (*Prerequisites)[Index];
if (Prerequisite == nullptr || !Prerequisite->AddSubsequent(this))
{
AlreadyCompletedPrerequisites++;
}
}
}
PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
}
// 设置任务数据
FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
TaskTrace::Launched(GetTraceId(), nullptr, Subsequents.IsValid(), ((TTask*)&TaskStorage)->GetDesiredThread(), sizeof(*this));
FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
return ReturnedEventRef;
}
// 持有任务数据
TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
TaskTrace::Created(GetTraceId(), sizeof(*this));
SetupPrereqs(Prerequisites, CurrentThreadIfKnown, false);
return this;
}
// 创建任务
static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
{
return FConstructor(new TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
}
TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 被执行的任务对象
bool TaskConstructed;
FGraphEventRef Subsequents; // 后续任务同步对象
};
在ExecuteTask函数中,调用了TTask::DoTask函数,也就是说实际执行任务的是TTask模板类。
自定义任务可以参考官方注释中的class FGenericTask,也可以直接派生TAsyncGraphTask类,TAsyncGraphTask类派生自任务基类FAsyncGraphTaskBase
ESubsequentsMode
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
namespace ESubsequentsMode
{
enum Type
{
// 会被其他任务依赖
TrackSubsequents,
// 仅执行,不会被其他任务依赖
FireAndForget
};
}
TaskThread 任务线程类型
FTaskThreadBase
FTaskThreadBase是执行任务的线程父类,定义了一组设置、操作任务的接口,如初始化、帧更新、入队任务、唤醒线程、请求退出等。
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp
class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
FTaskThreadBase()
: ThreadId(ENamedThreads::AnyThread)
, PerThreadIDTLSSlot(FPlatformTLS::InvalidTlsSlot)
, OwnerWorker(nullptr)
{
NewTasks.Reset(128);
}
// 设置数据
void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker);
// 从当前线程初始化
void InitializeForCurrentThread();
ENamedThreads::Type GetThreadId() const; // 返回ThreadId
// 用于NamedThread处理任务直到线程空闲或RequestQuit被调用
virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
// 入队任务, 假设this线程和当前线程一样。如果是NamedThread, 会直接进入私有的队列
virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);
// 入队任务, 假设this线程和当前线程不一样
virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
// 请求退出。会导致线程空闲时退出到调用者。如果是NamedThread,在ProcessTasksUntilQuit中用以返回给调用者;AnyThread则直接关闭
virtual void RequestQuit(int32 QueueIndex) = 0;
// 唤醒线程
virtual void WakeUp(int32 QueueIndex = 0) = 0;
// 查询任务是否在处理中
virtual bool IsProcessingTasks(int32 QueueIndex) = 0;
// 单线程Tick
virtual void Tick() override;
virtual bool Init() override;
virtual uint32 Run() override;
virtual void Stop() override
{
RequestQuit(-1);
}
virtual void Exit() override
{
}
virtual FSingleThreadRunnable* GetSingleThreadInterface() override
{
return this;
}
protected:
ENamedThreads::Type ThreadId; // 线程ID
uint32 PerThreadIDTLSSlot; // TLS Slot
FThreadSafeCounter IsStalled; // 阻塞计数器,用于触发阻塞信号
TArray<FBaseGraphTask*> NewTasks; // 待处理任务列表
FWorkerThread* OwnerWorker; // 所属的工作线程
};
FNamedTaskThread & FTaskThreadAnyThread
FTaskThreadBase只是抽象类,具体的实现由子类FNamedTaskThread和FTaskThreadAnyThread完成,前者是带名字的任务线程,后者是无名的任务线程。
两者在处理任务时的操作有所不同。FNamedTaskThread::ProcessTasksNamedThread从对应的任务队列的队首获得任务并执行,而FTaskThreadAnyThread::ProcessTasks中需要调用FTaskGraphImplementation::FindWork来获取任务。
FNamedTaskThread
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp
class FNamedTaskThread : public FTaskThreadBase
{
public:
// NamedThread处理任务直到线程空闲或RequestQuit被调用
virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
{
check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up
Queue(QueueIndex).QuitForReturn = false;
verify(++Queue(QueueIndex).RecursionGuard == 1);
const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
// 循环处理队列任务, 直到退出、关闭或平台不支持多线程
do
{
const bool bAllowStall = bIsMultiThread;
ProcessTasksNamedThread(QueueIndex, bAllowStall);
} while (!Queue(QueueIndex).QuitForReturn && !Queue(QueueIndex).QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
verify(!--Queue(QueueIndex).RecursionGuard);
}
// NamedThread处理任务直到线程空闲或RequestQuit被调用
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
{
check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up
Queue(QueueIndex).QuitForReturn = false;
verify(++Queue(QueueIndex).RecursionGuard == 1);
uint64 ProcessedTasks = ProcessTasksNamedThread(QueueIndex, false);
verify(!--Queue(QueueIndex).RecursionGuard);
return ProcessedTasks;
}
// NamedThread处理任务
uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall)
{
(...)
while (!Queue(QueueIndex).QuitForReturn)
{
const bool bIsRenderThreadAndPolling = bIsRenderThreadMainQueue && (GRenderThreadPollPeriodMs >= 0);
const bool bStallQueueAllowStall = bAllowStall && !bIsRenderThreadAndPolling;
// 从队列首部获取任务
FBaseGraphTask* Task = Queue(QueueIndex).StallQueue.Pop(0, bStallQueueAllowStall);
TestRandomizedThreads();
if (!Task)
{
(...)
}
else // 任务不为空
{
// 执行任务
Task->Execute(NewTasks, ENamedThreads::Type(ThreadId | (QueueIndex << ENamedThreads::QueueIndexShift)), true);
ProcessedTasks++;
TestRandomizedThreads();
}
}
(...)
return ProcessedTasks;
}
virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task) override;
virtual void RequestQuit(int32 QueueIndex) override;
virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task) override;
virtual bool IsProcessingTasks(int32 QueueIndex) override;
virtual void WakeUp(int32 QueueIndex) override;
private:
(...)
// 线程任务队列
struct FThreadTaskQueue
{
FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue; // 阻塞的任务队列
uint32 RecursionGuard; // 防止循环(递归)调用
bool QuitForReturn; // 是否请求退出
bool QuitForShutdown; // 是否请求关闭
FEvent* StallRestartEvent; // 当线程满载时的阻塞事件
FThreadTaskQueue()
: RecursionGuard(0)
, QuitForReturn(false)
, QuitForShutdown(false)
, StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
{
}
~FThreadTaskQueue()
{
FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
StallRestartEvent = nullptr;
}
};
FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex)
{
checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
return Queues[QueueIndex];
}
FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const
{
checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
return Queues[QueueIndex];
}
FThreadTaskQueue Queues[ENamedThreads::NumQueues]; // NamedThread专用的任务队列
};
FTaskThreadAnyThread
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp
class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
FTaskThreadAnyThread(int32 InPriorityIndex)
: PriorityIndex(InPriorityIndex)
{
}
virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
{
if (PriorityIndex != (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift))
{
FMemory::SetupTLSCachesOnCurrentThread();
}
check(!QueueIndex);
const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
do
{
ProcessTasks();
} while (!Queue.QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
}
virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
{
if (FTaskGraphInterface::IsMultithread() == false)
{
return ProcessTasks();
}
else
{
check(0);
return 0;
}
}
(...)
private:
#if UE_EXTERNAL_PROFILING_ENABLED
static inline const TCHAR* ThreadPriorityToName(int32 PriorityIdx)
{
PriorityIdx <<= ENamedThreads::ThreadPriorityShift;
if (PriorityIdx == ENamedThreads::HighThreadPriority)
{
return TEXT("Task Thread HP"); // 高优先级的工作线程
}
else if (PriorityIdx == ENamedThreads::NormalThreadPriority)
{
return TEXT("Task Thread NP"); // 普通优先级的工作线程
}
else if (PriorityIdx == ENamedThreads::BackgroundThreadPriority)
{
return TEXT("Task Thread BP"); // 后台优先级的工作线程
}
else
{
return TEXT("Task Thread Unknown Priority");
}
}
#endif
// 处理任务,与NamedTaskThread做法不同,主要是从TaskGraph系统中的无名任务队列获取任务
uint64 ProcessTasks()
{
(...)
while (1)
{
// 从TaskGraph系统中的无名任务队列获取任务的
FBaseGraphTask* Task = FindWork();
if (!Task)
{
(...)
TestRandomizedThreads();
const bool bIsMultithread = FTaskGraphInterface::IsMultithread();
if (bIsMultithread)
{
FScopeCycleCounter Scope(StallStatId, EStatFlags::Verbose);
Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall);
bDidStall = true;
}
if (Queue.QuitForShutdown || !bIsMultithread)
{
break;
}
TestRandomizedThreads();
(...)
continue;
}
TestRandomizedThreads();
(...)
bDidStall = false;
Task->Execute(NewTasks, ENamedThreads::Type(ThreadId), true);
ProcessedTasks++;
TestRandomizedThreads();
if (Queue.bStallForTuning)
{
(...)
{
FScopeLock Lock(&Queue.StallForTuning);
}
(...)
}
}
verify(!--Queue.RecursionGuard);
return ProcessedTasks;
}
// 任务队列数据
struct FThreadTaskQueue
{
FEvent* StallRestartEvent; // 当线程满载时的阻塞事件
uint32 RecursionGuard; // 防止循环(递归)调用
bool QuitForShutdown; // 是否请求关闭
bool bStallForTuning;
FCriticalSection StallForTuning; // 阻塞临界区
FThreadTaskQueue()
: StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
, RecursionGuard(0)
, QuitForShutdown(false)
, bStallForTuning(false)
{
}
~FThreadTaskQueue()
{
FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
StallRestartEvent = nullptr;
}
};
// 从TaskGraph系统中获取任务
FBaseGraphTask* FindWork();
// 任务队列,只有第一个用于无名线程
FThreadTaskQueue Queue;
int32 PriorityIndex;
};
ENamedThreads
namespace ENamedThreads中提供了线程类型枚举以及设置和获取渲染线程、线程索引、线程优先级、任务优先级的接口
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
namespace ENamedThreads
{
enum Type : int32
{
UnusedAnchor = -1,
/** The always-present, named threads are listed next **/
RHIThread,
GameThread,
// The render thread is sometimes the game thread and is sometimes the actual rendering thread
ActualRenderingThread = GameThread + 1,
// CAUTION ThreadedRenderingThread must be the last named thread, insert new named threads before it
/** not actually a thread index. Means "Unknown Thread" or "Any Unnamed Thread" **/
AnyThread = 0xff,
/** High bits are used for a queue index and priority**/
MainQueue = 0x000,
LocalQueue = 0x100,
NumQueues = 2,
ThreadIndexMask = 0xff,
QueueIndexMask = 0x100,
QueueIndexShift = 8,
/** High bits are used for a queue index task priority and thread priority**/
NormalTaskPriority = 0x000,
HighTaskPriority = 0x200,
NumTaskPriorities = 2,
TaskPriorityMask = 0x200,
TaskPriorityShift = 9,
NormalThreadPriority = 0x000,
HighThreadPriority = 0x400,
BackgroundThreadPriority = 0x800,
NumThreadPriorities = 3,
ThreadPriorityMask = 0xC00,
ThreadPriorityShift = 10,
/** Combinations **/
GameThread_Local = GameThread | LocalQueue,
ActualRenderingThread_Local = ActualRenderingThread | LocalQueue,
AnyHiPriThreadNormalTask = AnyThread | HighThreadPriority | NormalTaskPriority,
AnyHiPriThreadHiPriTask = AnyThread | HighThreadPriority | HighTaskPriority,
AnyNormalThreadNormalTask = AnyThread | NormalThreadPriority | NormalTaskPriority,
AnyNormalThreadHiPriTask = AnyThread | NormalThreadPriority | HighTaskPriority,
AnyBackgroundThreadNormalTask = AnyThread | BackgroundThreadPriority | NormalTaskPriority,
AnyBackgroundHiPriTask = AnyThread | BackgroundThreadPriority | HighTaskPriority,
};
(...)
}
TaskGraph 任务图类型
TaskGraph会根据线程优先级、是否启用后台线程创建不同的工作线程集合,然后创建它们的FWorkerThread对象。
入队任务时,会将任务Push到任务列表IncomingAnyThreadTasks中,并取出可执行的任务索引,根据任务的属性(希望在哪个线程执行、优先级、任务索引)启用对应的工作线程去执行。
FTaskGraphInterface
FTaskGraphInterface就是任务图的管理器和工厂,提供了任务操作、线程操作的接口。
FTaskGraphImplementation
FTaskGraphImplementation继承并实现了FTaskGraphInterface的接口。FTaskGraphImplementation采用了特殊的线程对象WorkerThreads(工作线程)来作为执行的载体,如果是专用的线程(带名字的线程,如GameThread、RHI、ActualRenderingThread),则会进入专用的任务队列。
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp
class FTaskGraphImplementation final : public FTaskGraphInterface
{
public:
// 获取FTaskGraphImplementation单例
static FTaskGraphImplementation& Get()
{
checkThreadGraph(!GUseNewTaskBackend);
checkThreadGraph(TaskGraphImplementationSingleton);
return *static_cast<FTaskGraphImplementation*>(TaskGraphImplementationSingleton);
}
// 构造函数, 计算任务线程数量, 创建专用线程和无名线程等
FTaskGraphImplementation(int32);
virtual ~FTaskGraphImplementation()
// 入队任务,AnyThread和NamedTaskThread分别处理
// AnyThread的任务会加入IncomingAnyThreadTasks对应优先级的队列
// NamedTaskThread的任务会根据优先级,加入各自所有的FThreadTaskQueue的对应队列
virtual void QueueTask(FBaseGraphTask* Task, bool bWakeUpWorker, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
virtual int32 GetNumWorkerThreads() final override;
virtual int32 GetNumForegroundThreads() final override;
virtual int32 GetNumBackgroundThreads() final override;
virtual bool IsCurrentThreadKnown() final override;
virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override;
virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override;
// 将当前线程导入到指定Index
virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override;
// ----处理任务接口----
virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override;
virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override;
virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override;
virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override;
virtual void AddShutdownCallback(TFunction<void()>& Callback);
// ----任务调度接口----
virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) override;
void StartTaskThread(int32 Priority, int32 IndexToStart);
void StartAllTaskThreads(bool bDoBackgroundThreads);
FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed) override;
void StallForTuning(int32 Index, bool Stall) override;
void SetTaskThreadPriorities(EThreadPriority Pri);
private:
// 获取指定索引的任务线程引用
FTaskThreadBase& Thread(int32 Index);
// 获取当前线程索引
ENamedThreads::Type GetCurrentThread();
int32 ThreadIndexToPriorityIndex(int32 ThreadIndex);
enum
{
/** Compile time maximum number of threads. Didn't really need to be a compile time constant, but task thread are limited by MAX_LOCK_FREE_LINKS_AS_BITS **/
MAX_THREADS = 0xFFFF,
MAX_THREAD_PRIORITIES = 3
};
FWorkerThread WorkerThreads[MAX_THREADS]; // 所有工作线程(任务线程)对象数组
int32 NumThreads; // 实际被使用的线程数量
int32 NumNamedThreads; // 专用线程数量
int32 NumTaskThreadSets; // 任务线程集合数量
int32 NumTaskThreadsPerSet; // 每个集合拥有的任务线程数量
bool bCreatedHiPriorityThreads;
bool bCreatedBackgroundPriorityThreads;
ENamedThreads::Type LastExternalThread;
FThreadSafeCounter ReentrancyCheck;
uint32 PerThreadIDTLSSlot;
TArray<TFunction<void()> > ShutdownCallbacks; // 销毁前的回调
FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES];
};
struct FWorkerThread
{
FTaskThreadBase* TaskGraphWorker; // 所属的FTaskThread对象
FRunnableThread* RunnableThread; // 真正执行任务的可运行线程
bool bAttached; // 是否附加的线程(一般用于专用线程)
/** Constructor to set reasonable defaults. **/
FWorkerThread()
: TaskGraphWorker(nullptr)
, RunnableThread(nullptr)
, bAttached(false)
{
}
};
WorkerThreads保存TaskGraph系统用到所有的线程对象,分布如下图
GraphEvent 任务图事件类型
FGraphEvent
FGraphEvent表示一个Task完成的事件,FGraphEvent实现了Task之间的依赖关系,只有Task依赖的所有前置Task执行完成,当前Task才会被加入到队列中。所以FGraphEvent总是和一个Task相关,它也是在一个Task初始化的时候创建的。
在一个Task执行完成之后,与其相关的Event就算完成了,于是Event就会处理所有依赖于自己的后续Task。
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h
class FGraphEvent
{
public:
static CORE_API FGraphEventRef CreateGraphEvent();
// 参数FBaseGraphTask* Task需要当前FGraphEvent作为其前置任务事件时,需要将Task添加进当前FGraphEvent的链表中
// 如果当前Event已经触发,则添加失败,返回false;添加成功,则返回true
bool AddSubsequent(class FBaseGraphTask* Subsequent);
// 检查前序任务事件数是否为0
void CheckDontCompleteUntilIsEmpty();
// 将EventsToWaitFor加入Event的前序任务事件列表
void DontCompleteUntil(FGraphEventRef EventToWaitFor);
// 当前Event未完成时,推动它依赖的未完成前置任务执行。当前Event完成后,则推动依赖该Event的任务执行
CORE_API void DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
CORE_API void DispatchSubsequents(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, bool bInternal = false);
// 当前任务事件是否完成
bool IsComplete() const
{
return SubsequentList.IsClosed();
}
// 等待直到当前Event的任务完成
void Wait(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
(...)
private:
friend class TRefCountPtr<FGraphEvent>;
friend class TLockFreeClassAllocator_TLSCache<FGraphEvent, PLATFORM_CACHE_LINE_SIZE>;
// 释放ToRecycle的内存
static CORE_API void Recycle(FGraphEvent* ToRecycle);
friend struct FGraphEventAndSmallTaskStorage;
/**
* Hidden Constructor
**/
FGraphEvent()
: ThreadToDoGatherOn(ENamedThreads::AnyHiPriThreadHiPriTask)
{
}
/**
* Destructor. Verifies we aren't destroying it prematurely.
**/
~FGraphEvent();
// Interface for TRefCountPtr
public:
// 引用计数+1
uint32 AddRef();
// 引用计数-1
uint32 Release();
// 返回引用计数
uint32 GetRefCount() const;
private:
// 后继任务列表
TClosableLockFreePointerListUnorderedSingleConsumer<FBaseGraphTask, 0> SubsequentList;
// 前序任务事件列表
FGraphEventArray EventsToWaitFor;
// 引用计数
FThreadSafeCounter ReferenceCount;
// 用于推动执行未完成的前序任务的FNullGraphTask所线程
ENamedThreads::Type ThreadToDoGatherOn;
(...)
};
FGraphEventRef
指向FGraphEvent类型的带引用计数的指针
// Engine\Source\Runtime\Core\Public\Async\TaskGraphFwd.h
using FGraphEventRef = TRefCountPtr<class FGraphEvent>;
Reference
Unreal Engine Documentation: Tasks System