summaryrefslogtreecommitdiffstats
path: root/gfx/angle/checkout/src/libANGLE/WorkerThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'gfx/angle/checkout/src/libANGLE/WorkerThread.cpp')
-rw-r--r--gfx/angle/checkout/src/libANGLE/WorkerThread.cpp356
1 files changed, 356 insertions, 0 deletions
diff --git a/gfx/angle/checkout/src/libANGLE/WorkerThread.cpp b/gfx/angle/checkout/src/libANGLE/WorkerThread.cpp
new file mode 100644
index 0000000000..30c454dd26
--- /dev/null
+++ b/gfx/angle/checkout/src/libANGLE/WorkerThread.cpp
@@ -0,0 +1,356 @@
+//
+// Copyright 2016 The ANGLE Project Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// WorkerThread:
+// Task running thread for ANGLE, similar to a TaskRunner in Chromium.
+// Might be implemented differently depending on platform.
+//
+
+#include "libANGLE/WorkerThread.h"
+
+#include "libANGLE/trace.h"
+
+#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+# include <condition_variable>
+# include <future>
+# include <mutex>
+# include <queue>
+# include <thread>
+#endif // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+namespace angle
+{
+
+WaitableEvent::WaitableEvent() = default;
+WaitableEvent::~WaitableEvent() = default;
+
+void WaitableEventDone::wait() {}
+
+bool WaitableEventDone::isReady()
+{
+ return true;
+}
+
+WorkerThreadPool::WorkerThreadPool() = default;
+WorkerThreadPool::~WorkerThreadPool() = default;
+
+class SingleThreadedWaitableEvent final : public WaitableEvent
+{
+ public:
+ SingleThreadedWaitableEvent() = default;
+ ~SingleThreadedWaitableEvent() override = default;
+
+ void wait() override;
+ bool isReady() override;
+};
+
+void SingleThreadedWaitableEvent::wait() {}
+
+bool SingleThreadedWaitableEvent::isReady()
+{
+ return true;
+}
+
+class SingleThreadedWorkerPool final : public WorkerThreadPool
+{
+ public:
+ std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+ void setMaxThreads(size_t maxThreads) override;
+ bool isAsync() override;
+};
+
+// SingleThreadedWorkerPool implementation.
+std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
+ std::shared_ptr<Closure> task)
+{
+ (*task)();
+ return std::make_shared<SingleThreadedWaitableEvent>();
+}
+
+void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
+
+bool SingleThreadedWorkerPool::isAsync()
+{
+ return false;
+}
+
+#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+class AsyncWaitableEvent final : public WaitableEvent
+{
+ public:
+ AsyncWaitableEvent() : mIsPending(true) {}
+ ~AsyncWaitableEvent() override = default;
+
+ void wait() override;
+ bool isReady() override;
+
+ private:
+ friend class AsyncWorkerPool;
+ void setFuture(std::future<void> &&future);
+
+ // To block wait() when the task is still in queue to be run.
+ // Also to protect the concurrent accesses from both main thread and
+ // background threads to the member fields.
+ std::mutex mMutex;
+
+ bool mIsPending;
+ std::condition_variable mCondition;
+ std::future<void> mFuture;
+};
+
+void AsyncWaitableEvent::setFuture(std::future<void> &&future)
+{
+ mFuture = std::move(future);
+}
+
+void AsyncWaitableEvent::wait()
+{
+ ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ mCondition.wait(lock, [this] { return !mIsPending; });
+ }
+
+ ASSERT(mFuture.valid());
+ mFuture.wait();
+}
+
+bool AsyncWaitableEvent::isReady()
+{
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (mIsPending)
+ {
+ return false;
+ }
+ ASSERT(mFuture.valid());
+ return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
+}
+
+class AsyncWorkerPool final : public WorkerThreadPool
+{
+ public:
+ AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
+ ~AsyncWorkerPool() override = default;
+
+ std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+ void setMaxThreads(size_t maxThreads) override;
+ bool isAsync() override;
+
+ private:
+ void checkToRunPendingTasks();
+
+ // To protect the concurrent accesses from both main thread and background
+ // threads to the member fields.
+ std::mutex mMutex;
+
+ size_t mMaxThreads;
+ size_t mRunningThreads;
+ std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
+};
+
+// AsyncWorkerPool implementation.
+std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
+{
+ ASSERT(mMaxThreads > 0);
+
+ auto waitable = std::make_shared<AsyncWaitableEvent>();
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mTaskQueue.push(std::make_pair(waitable, task));
+ }
+ checkToRunPendingTasks();
+ return std::move(waitable);
+}
+
+void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
+{
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
+ }
+ checkToRunPendingTasks();
+}
+
+bool AsyncWorkerPool::isAsync()
+{
+ return true;
+}
+
+void AsyncWorkerPool::checkToRunPendingTasks()
+{
+ std::lock_guard<std::mutex> lock(mMutex);
+ while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
+ {
+ auto task = mTaskQueue.front();
+ mTaskQueue.pop();
+ auto waitable = task.first;
+ auto closure = task.second;
+
+ auto future = std::async(std::launch::async, [closure, this] {
+ {
+ ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
+ (*closure)();
+ }
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ ASSERT(mRunningThreads != 0);
+ --mRunningThreads;
+ }
+ checkToRunPendingTasks();
+ });
+
+ ++mRunningThreads;
+
+ {
+ std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
+ waitable->mIsPending = false;
+ waitable->setFuture(std::move(future));
+ }
+ waitable->mCondition.notify_all();
+ }
+}
+#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+
+#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
+class DelegateWaitableEvent final : public WaitableEvent
+{
+ public:
+ DelegateWaitableEvent() = default;
+ ~DelegateWaitableEvent() override = default;
+
+ void wait() override;
+ bool isReady() override;
+
+ void markAsReady();
+
+ private:
+ // To protect the concurrent accesses from both main thread and background
+ // threads to the member fields.
+ std::mutex mMutex;
+
+ bool mIsReady = false;
+ std::condition_variable mCondition;
+};
+
+void DelegateWaitableEvent::markAsReady()
+{
+ std::lock_guard<std::mutex> lock(mMutex);
+ mIsReady = true;
+ mCondition.notify_all();
+}
+
+void DelegateWaitableEvent::wait()
+{
+ std::unique_lock<std::mutex> lock(mMutex);
+ mCondition.wait(lock, [this] { return mIsReady; });
+}
+
+bool DelegateWaitableEvent::isReady()
+{
+ std::lock_guard<std::mutex> lock(mMutex);
+ return mIsReady;
+}
+
+class DelegateWorkerPool final : public WorkerThreadPool
+{
+ public:
+ DelegateWorkerPool() = default;
+ ~DelegateWorkerPool() override = default;
+
+ std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
+
+ void setMaxThreads(size_t maxThreads) override;
+ bool isAsync() override;
+};
+
+// A function wrapper to execute the closure and to notify the waitable
+// event after the execution.
+class DelegateWorkerTask
+{
+ public:
+ DelegateWorkerTask(std::shared_ptr<Closure> task,
+ std::shared_ptr<DelegateWaitableEvent> waitable)
+ : mTask(task), mWaitable(waitable)
+ {}
+ DelegateWorkerTask() = delete;
+ DelegateWorkerTask(DelegateWorkerTask &) = delete;
+
+ static void RunTask(void *userData)
+ {
+ DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
+ (*workerTask->mTask)();
+ workerTask->mWaitable->markAsReady();
+
+ // Delete the task after its execution.
+ delete workerTask;
+ }
+
+ private:
+ ~DelegateWorkerTask() = default;
+
+ std::shared_ptr<Closure> mTask;
+ std::shared_ptr<DelegateWaitableEvent> mWaitable;
+};
+
+std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
+{
+ auto waitable = std::make_shared<DelegateWaitableEvent>();
+
+ // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
+ DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
+ auto *platform = ANGLEPlatformCurrent();
+ platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
+
+ return std::move(waitable);
+}
+
+void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
+
+bool DelegateWorkerPool::isAsync()
+{
+ return true;
+}
+#endif
+
+// static
+std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
+{
+ std::shared_ptr<WorkerThreadPool> pool(nullptr);
+
+#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
+ const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask;
+ if (hasPostWorkerTaskImpl && multithreaded)
+ {
+ pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
+ }
+#endif
+#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
+ if (!pool && multithreaded)
+ {
+ pool = std::shared_ptr<WorkerThreadPool>(
+ new AsyncWorkerPool(std::thread::hardware_concurrency()));
+ }
+#endif
+ if (!pool)
+ {
+ return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
+ }
+ return pool;
+}
+
+// static
+std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
+ std::shared_ptr<WorkerThreadPool> pool,
+ std::shared_ptr<Closure> task)
+{
+ std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
+ if (event.get())
+ {
+ event->setWorkerThreadPool(pool);
+ }
+ return event;
+}
+
+} // namespace angle