summaryrefslogtreecommitdiffstats
path: root/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc')
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc215
1 files changed, 215 insertions, 0 deletions
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
new file mode 100644
index 0000000000..f26a9ba263
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
@@ -0,0 +1,215 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+#include "lib/threads/thread_parallel_runner_internal.h"
+
+#include <algorithm>
+
+#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
+ defined(THREAD_SANITIZER)
+#include "sanitizer/common_interface_defs.h" // __sanitizer_print_stack_trace
+#endif // defined(*_SANITIZER)
+
+#include <jxl/thread_parallel_runner.h>
+
+#include "lib/jxl/base/profiler.h"
+
+namespace {
+
+// Important: JXL_ASSERT does not guarantee running the `condition` code,
+// use only for debug mode checks.
+
+#if JXL_ENABLE_ASSERT
+// Exits the program after printing a stack trace when possible.
+bool Abort() {
+#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
+ defined(THREAD_SANITIZER)
+ // If compiled with any sanitizer print a stack trace. This call doesn't crash
+ // the program, instead the trap below will crash it also allowing gdb to
+ // break there.
+ __sanitizer_print_stack_trace();
+#endif // defined(*_SANITIZER)
+
+#ifdef _MSC_VER
+ __debugbreak();
+ abort();
+#else
+ __builtin_trap();
+#endif
+}
+#define JXL_ASSERT(condition) \
+ do { \
+ if (!(condition)) { \
+ Abort(); \
+ } \
+ } while (0)
+#else
+#define JXL_ASSERT(condition) \
+ do { \
+ } while (0)
+#endif
+} // namespace
+
+namespace jpegxl {
+
+// static
+JxlParallelRetCode ThreadParallelRunner::Runner(
+ void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
+ JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
+ ThreadParallelRunner* self =
+ static_cast<ThreadParallelRunner*>(runner_opaque);
+ if (start_range > end_range) return -1;
+ if (start_range == end_range) return 0;
+
+ int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
+ if (ret != 0) return ret;
+
+ // Use a sequential run when num_worker_threads_ is zero since we have no
+ // worker threads.
+ if (self->num_worker_threads_ == 0) {
+ const size_t thread = 0;
+ for (uint32_t task = start_range; task < end_range; ++task) {
+ func(jpegxl_opaque, task, thread);
+ }
+ return 0;
+ }
+
+ if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
+ return -1; // Must not re-enter.
+ }
+
+ const WorkerCommand worker_command =
+ (static_cast<WorkerCommand>(start_range) << 32) + end_range;
+ // Ensure the inputs do not result in a reserved command.
+ JXL_ASSERT(worker_command != kWorkerWait);
+ JXL_ASSERT(worker_command != kWorkerOnce);
+ JXL_ASSERT(worker_command != kWorkerExit);
+
+ self->data_func_ = func;
+ self->jpegxl_opaque_ = jpegxl_opaque;
+ self->num_reserved_.store(0, std::memory_order_relaxed);
+
+ self->StartWorkers(worker_command);
+ self->WorkersReadyBarrier();
+
+ if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
+ return -1;
+ }
+ return 0;
+}
+
+// static
+void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
+ const WorkerCommand command,
+ const int thread) {
+ const uint32_t begin = command >> 32;
+ const uint32_t end = command & 0xFFFFFFFF;
+ const uint32_t num_tasks = end - begin;
+ const uint32_t num_worker_threads = self->num_worker_threads_;
+
+ // OpenMP introduced several "schedule" strategies:
+ // "single" (static assignment of exactly one chunk per thread): slower.
+ // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
+ // "guided" (allocates k tasks, decreases k): computing k = remaining/n
+ // is faster than halving k each iteration. We prefer this strategy
+ // because it avoids user-specified parameters.
+
+ for (;;) {
+#if 0
+ // dynamic
+ const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
+#else
+ // guided
+ const uint32_t num_reserved =
+ self->num_reserved_.load(std::memory_order_relaxed);
+ // It is possible that more tasks are reserved than ready to run.
+ const uint32_t num_remaining =
+ num_tasks - std::min(num_reserved, num_tasks);
+ const uint32_t my_size =
+ std::max(num_remaining / (num_worker_threads * 4), 1u);
+#endif
+ const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
+ my_size, std::memory_order_relaxed);
+ const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
+ // Another thread already reserved the last task.
+ if (my_begin >= my_end) {
+ break;
+ }
+ for (uint32_t task = my_begin; task < my_end; ++task) {
+ self->data_func_(self->jpegxl_opaque_, task, thread);
+ }
+ }
+}
+
+// static
+void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
+ const int thread) {
+ // Until kWorkerExit command received:
+ for (;;) {
+ std::unique_lock<std::mutex> lock(self->mutex_);
+ // Notify main thread that this thread is ready.
+ if (++self->workers_ready_ == self->num_threads_) {
+ self->workers_ready_cv_.notify_one();
+ }
+ RESUME_WAIT:
+ // Wait for a command.
+ self->worker_start_cv_.wait(lock);
+ const WorkerCommand command = self->worker_start_command_;
+ switch (command) {
+ case kWorkerWait: // spurious wakeup:
+ goto RESUME_WAIT; // lock still held, avoid incrementing ready.
+ case kWorkerOnce:
+ lock.unlock();
+ self->data_func_(self->jpegxl_opaque_, thread, thread);
+ break;
+ case kWorkerExit:
+ return; // exits thread
+ default:
+ lock.unlock();
+ RunRange(self, command, thread);
+ break;
+ }
+ }
+}
+
+ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
+ : num_worker_threads_(num_worker_threads),
+ num_threads_(std::max(num_worker_threads, 1)) {
+ PROFILER_ZONE("ThreadParallelRunner ctor");
+
+ threads_.reserve(num_worker_threads_);
+
+ // Suppress "unused-private-field" warning.
+ (void)padding1;
+ (void)padding2;
+
+ // Safely handle spurious worker wakeups.
+ worker_start_command_ = kWorkerWait;
+
+ for (uint32_t i = 0; i < num_worker_threads_; ++i) {
+ threads_.emplace_back(ThreadFunc, this, i);
+ }
+
+ if (num_worker_threads_ != 0) {
+ WorkersReadyBarrier();
+ }
+
+ // Warm up profiler on worker threads so its expensive initialization
+ // doesn't count towards other timer measurements.
+ RunOnEachThread(
+ [](const int task, const int thread) { PROFILER_ZONE("@InitWorkers"); });
+}
+
+ThreadParallelRunner::~ThreadParallelRunner() {
+ if (num_worker_threads_ != 0) {
+ StartWorkers(kWorkerExit);
+ }
+
+ for (std::thread& thread : threads_) {
+ JXL_ASSERT(thread.joinable());
+ thread.join();
+ }
+}
+} // namespace jpegxl