summaryrefslogtreecommitdiffstats
path: root/third_party/jpeg-xl/lib/threads
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/jpeg-xl/lib/threads')
-rw-r--r--third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in13
-rw-r--r--third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc200
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc107
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc211
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h166
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc122
6 files changed, 819 insertions, 0 deletions
diff --git a/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in b/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in
new file mode 100644
index 0000000000..dfbaa3ffb5
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in
@@ -0,0 +1,13 @@
+prefix=@CMAKE_INSTALL_PREFIX@
+exec_prefix=${prefix}
+libdir=@PKGCONFIG_TARGET_LIBS@
+includedir=@PKGCONFIG_TARGET_INCLUDES@
+
+Name: libjxl_threads
+Description: JPEG XL multi-thread runner using std::threads.
+Version: @JPEGXL_LIBRARY_VERSION@
+@JPEGXL_REQUIRES_TYPE@: @JPEGXL_THREADS_LIBRARY_REQUIRES@
+Libs: -L${libdir} -ljxl_threads
+Libs.private: -lm
+Cflags: -I${includedir}
+Cflags.private: -DJXL_THREADS_STATIC_DEFINE
diff --git a/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc b/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc
new file mode 100644
index 0000000000..238ccbf20c
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc
@@ -0,0 +1,200 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+#include <jxl/jxl_threads_export.h>
+#include <jxl/memory_manager.h>
+#include <jxl/parallel_runner.h>
+#include <jxl/resizable_parallel_runner.h>
+
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+namespace jpegxl {
+namespace {
+
+// A thread pool that allows changing the number of threads it runs. It also
+// runs tasks on the calling thread, which can work better on schedulers for
+// heterogeneous architectures.
+struct ResizeableParallelRunner {
+ void SetNumThreads(size_t num) {
+ if (num > 0) {
+ num -= 1;
+ }
+ {
+ std::unique_lock<std::mutex> l(state_mutex_);
+ num_desired_workers_ = num;
+ workers_can_proceed_.notify_all();
+ }
+ if (workers_.size() < num) {
+ for (size_t i = workers_.size(); i < num; i++) {
+ workers_.emplace_back([this, i]() { WorkerBody(i); });
+ }
+ }
+ if (workers_.size() > num) {
+ for (size_t i = num; i < workers_.size(); i++) {
+ workers_[i].join();
+ }
+ workers_.resize(num);
+ }
+ }
+
+ ~ResizeableParallelRunner() { SetNumThreads(0); }
+
+ JxlParallelRetCode Run(void* jxl_opaque, JxlParallelRunInit init,
+ JxlParallelRunFunction func, uint32_t start,
+ uint32_t end) {
+ if (start + 1 == end) {
+ JxlParallelRetCode ret = init(jxl_opaque, 1);
+ if (ret != 0) return ret;
+
+ func(jxl_opaque, start, 0);
+ return ret;
+ }
+
+ size_t num_workers = std::min<size_t>(workers_.size() + 1, end - start);
+ JxlParallelRetCode ret = init(jxl_opaque, num_workers);
+ if (ret != 0) {
+ return ret;
+ }
+
+ {
+ std::unique_lock<std::mutex> l(state_mutex_);
+ // Avoid waking up more workers than needed.
+ max_running_workers_ = end - start - 1;
+ next_task_ = start;
+ end_task_ = end;
+ func_ = func;
+ jxl_opaque_ = jxl_opaque;
+ work_available_ = true;
+ num_running_workers_++;
+ workers_can_proceed_.notify_all();
+ }
+
+ DequeueTasks(0);
+
+ while (true) {
+ std::unique_lock<std::mutex> l(state_mutex_);
+ if (num_running_workers_ == 0) break;
+ work_done_.wait(l);
+ }
+
+ return ret;
+ }
+
+ private:
+ void WorkerBody(size_t worker_id) {
+ while (true) {
+ {
+ std::unique_lock<std::mutex> l(state_mutex_);
+ // Worker pool was reduced, resize down.
+ if (worker_id >= num_desired_workers_) {
+ return;
+ }
+ // Nothing to do this time.
+ if (!work_available_ || worker_id >= max_running_workers_) {
+ workers_can_proceed_.wait(l);
+ continue;
+ }
+ num_running_workers_++;
+ }
+ DequeueTasks(worker_id + 1);
+ }
+ }
+
+ void DequeueTasks(size_t thread_id) {
+ while (true) {
+ uint32_t task = next_task_++;
+ if (task >= end_task_) {
+ std::unique_lock<std::mutex> l(state_mutex_);
+ num_running_workers_--;
+ work_available_ = false;
+ if (num_running_workers_ == 0) {
+ work_done_.notify_all();
+ }
+ break;
+ }
+ func_(jxl_opaque_, task, thread_id);
+ }
+ }
+
+ // Checks when the worker has something to do, which can be one of:
+ // - quitting (when worker_id >= num_desired_workers_)
+ // - having work available for them (work_available_ is true and worker_id >=
+ // max_running_workers_)
+ std::condition_variable workers_can_proceed_;
+
+ // Workers are done, and the main thread can proceed (num_running_workers_ ==
+ // 0)
+ std::condition_variable work_done_;
+
+ std::vector<std::thread> workers_;
+
+ // Protects all the remaining variables, except for func_, jxl_opaque_ and
+ // end_task_ (for which only the write by the main thread is protected, and
+ // subsequent uses by workers happen-after it) and next_task_ (which is
+ // atomic).
+ std::mutex state_mutex_;
+
+ // Range of tasks still need to be done.
+ std::atomic<uint32_t> next_task_;
+ uint32_t end_task_;
+
+ // Function to run and its argument.
+ JxlParallelRunFunction func_;
+ void* jxl_opaque_; // not owned
+
+ // Variables that control the workers:
+ // - work_available_ is set to true after a call to Run() and to false at the
+ // end of it.
+ // - num_desired_workers_ represents the number of workers that should be
+ // present.
+ // - max_running_workers_ represents the number of workers that should be
+ // executing tasks.
+ // - num_running_workers_ represents the number of workers that are executing
+ // tasks.
+ size_t num_desired_workers_ = 0;
+ size_t max_running_workers_ = 0;
+ size_t num_running_workers_ = 0;
+ bool work_available_ = false;
+};
+} // namespace
+} // namespace jpegxl
+
+extern "C" {
+JXL_THREADS_EXPORT JxlParallelRetCode JxlResizableParallelRunner(
+ void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
+ JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
+ return static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque)
+ ->Run(jpegxl_opaque, init, func, start_range, end_range);
+}
+
+JXL_THREADS_EXPORT void* JxlResizableParallelRunnerCreate(
+ const JxlMemoryManager* memory_manager) {
+ return new jpegxl::ResizeableParallelRunner();
+}
+
+JXL_THREADS_EXPORT void JxlResizableParallelRunnerSetThreads(
+ void* runner_opaque, size_t num_threads) {
+ static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque)
+ ->SetNumThreads(num_threads);
+}
+
+JXL_THREADS_EXPORT void JxlResizableParallelRunnerDestroy(void* runner_opaque) {
+ delete static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque);
+}
+
+JXL_THREADS_EXPORT uint32_t
+JxlResizableParallelRunnerSuggestThreads(uint64_t xsize, uint64_t ysize) {
+ // ~one thread per group.
+ return std::min<uint64_t>(std::thread::hardware_concurrency(),
+ xsize * ysize / (256 * 256));
+}
+}
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc
new file mode 100644
index 0000000000..d12947ce55
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc
@@ -0,0 +1,107 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+#include <jxl/memory_manager.h>
+#include <jxl/parallel_runner.h>
+#include <jxl/thread_parallel_runner.h>
+#include <string.h>
+
+#include <cstdint>
+#include <cstdlib>
+#include <thread>
+
+#include "lib/threads/thread_parallel_runner_internal.h"
+
+namespace {
+
+// Default JxlMemoryManager using malloc and free for the jpegxl_threads
+// library. Same as the default JxlMemoryManager for the jpegxl library
+// itself.
+
+// Default alloc and free functions.
+void* ThreadMemoryManagerDefaultAlloc(void* opaque, size_t size) {
+ return malloc(size);
+}
+
+void ThreadMemoryManagerDefaultFree(void* opaque, void* address) {
+ free(address);
+}
+
+// Initializes the memory manager instance with the passed one. The
+// MemoryManager passed in |memory_manager| may be NULL or contain NULL
+// functions which will be initialized with the default ones. If either alloc
+// or free are NULL, then both must be NULL, otherwise this function returns an
+// error.
+bool ThreadMemoryManagerInit(JxlMemoryManager* self,
+ const JxlMemoryManager* memory_manager) {
+ if (memory_manager) {
+ *self = *memory_manager;
+ } else {
+ memset(self, 0, sizeof(*self));
+ }
+ if (!self->alloc != !self->free) {
+ return false;
+ }
+ if (!self->alloc) self->alloc = ThreadMemoryManagerDefaultAlloc;
+ if (!self->free) self->free = ThreadMemoryManagerDefaultFree;
+
+ return true;
+}
+
+void* ThreadMemoryManagerAlloc(const JxlMemoryManager* memory_manager,
+ size_t size) {
+ return memory_manager->alloc(memory_manager->opaque, size);
+}
+
+void ThreadMemoryManagerFree(const JxlMemoryManager* memory_manager,
+ void* address) {
+ return memory_manager->free(memory_manager->opaque, address);
+}
+
+} // namespace
+
+JxlParallelRetCode JxlThreadParallelRunner(
+ void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
+ JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
+ return jpegxl::ThreadParallelRunner::Runner(
+ runner_opaque, jpegxl_opaque, init, func, start_range, end_range);
+}
+
+/// Starts the given number of worker threads and blocks until they are ready.
+/// "num_worker_threads" defaults to one per hyperthread. If zero, all tasks
+/// run on the main thread.
+void* JxlThreadParallelRunnerCreate(const JxlMemoryManager* memory_manager,
+ size_t num_worker_threads) {
+ JxlMemoryManager local_memory_manager;
+ if (!ThreadMemoryManagerInit(&local_memory_manager, memory_manager))
+ return nullptr;
+
+ void* alloc = ThreadMemoryManagerAlloc(&local_memory_manager,
+ sizeof(jpegxl::ThreadParallelRunner));
+ if (!alloc) return nullptr;
+ // Placement new constructor on allocated memory
+ jpegxl::ThreadParallelRunner* runner =
+ new (alloc) jpegxl::ThreadParallelRunner(num_worker_threads);
+ runner->memory_manager = local_memory_manager;
+
+ return runner;
+}
+
+void JxlThreadParallelRunnerDestroy(void* runner_opaque) {
+ jpegxl::ThreadParallelRunner* runner =
+ reinterpret_cast<jpegxl::ThreadParallelRunner*>(runner_opaque);
+ if (runner) {
+ JxlMemoryManager local_memory_manager = runner->memory_manager;
+ // Call destructor directly since custom free function is used.
+ runner->~ThreadParallelRunner();
+ ThreadMemoryManagerFree(&local_memory_manager, runner);
+ }
+}
+
+// Get default value for num_worker_threads parameter of
+// InitJxlThreadParallelRunner.
+size_t JxlThreadParallelRunnerDefaultNumWorkerThreads() {
+ return std::thread::hardware_concurrency();
+}
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
new file mode 100644
index 0000000000..5f73d94897
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
@@ -0,0 +1,211 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+#include "lib/threads/thread_parallel_runner_internal.h"
+
+#include <jxl/parallel_runner.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
+#include <thread>
+
+#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
+ defined(THREAD_SANITIZER)
+#include "sanitizer/common_interface_defs.h" // __sanitizer_print_stack_trace
+#endif // defined(*_SANITIZER)
+
+namespace {
+
+// Important: JXL_ASSERT does not guarantee running the `condition` code,
+// use only for debug mode checks.
+
+#if JXL_ENABLE_ASSERT
+// Exits the program after printing a stack trace when possible.
+bool Abort() {
+#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
+ defined(THREAD_SANITIZER)
+ // If compiled with any sanitizer print a stack trace. This call doesn't crash
+ // the program, instead the trap below will crash it also allowing gdb to
+ // break there.
+ __sanitizer_print_stack_trace();
+#endif // defined(*_SANITIZER)
+
+#ifdef _MSC_VER
+ __debugbreak();
+ abort();
+#else
+ __builtin_trap();
+#endif
+}
+#define JXL_ASSERT(condition) \
+ do { \
+ if (!(condition)) { \
+ Abort(); \
+ } \
+ } while (0)
+#else
+#define JXL_ASSERT(condition) \
+ do { \
+ } while (0)
+#endif
+} // namespace
+
+namespace jpegxl {
+
+// static
+JxlParallelRetCode ThreadParallelRunner::Runner(
+ void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
+ JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
+ ThreadParallelRunner* self =
+ static_cast<ThreadParallelRunner*>(runner_opaque);
+ if (start_range > end_range) return -1;
+ if (start_range == end_range) return 0;
+
+ int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
+ if (ret != 0) return ret;
+
+ // Use a sequential run when num_worker_threads_ is zero since we have no
+ // worker threads.
+ if (self->num_worker_threads_ == 0) {
+ const size_t thread = 0;
+ for (uint32_t task = start_range; task < end_range; ++task) {
+ func(jpegxl_opaque, task, thread);
+ }
+ return 0;
+ }
+
+ if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
+ return -1; // Must not re-enter.
+ }
+
+ const WorkerCommand worker_command =
+ (static_cast<WorkerCommand>(start_range) << 32) + end_range;
+ // Ensure the inputs do not result in a reserved command.
+ JXL_ASSERT(worker_command != kWorkerWait);
+ JXL_ASSERT(worker_command != kWorkerOnce);
+ JXL_ASSERT(worker_command != kWorkerExit);
+
+ self->data_func_ = func;
+ self->jpegxl_opaque_ = jpegxl_opaque;
+ self->num_reserved_.store(0, std::memory_order_relaxed);
+
+ self->StartWorkers(worker_command);
+ self->WorkersReadyBarrier();
+
+ if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
+ return -1;
+ }
+ return 0;
+}
+
+// static
+void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
+ const WorkerCommand command,
+ const int thread) {
+ const uint32_t begin = command >> 32;
+ const uint32_t end = command & 0xFFFFFFFF;
+ const uint32_t num_tasks = end - begin;
+ const uint32_t num_worker_threads = self->num_worker_threads_;
+
+ // OpenMP introduced several "schedule" strategies:
+ // "single" (static assignment of exactly one chunk per thread): slower.
+ // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
+ // "guided" (allocates k tasks, decreases k): computing k = remaining/n
+ // is faster than halving k each iteration. We prefer this strategy
+ // because it avoids user-specified parameters.
+
+ for (;;) {
+#if 0
+ // dynamic
+ const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
+#else
+ // guided
+ const uint32_t num_reserved =
+ self->num_reserved_.load(std::memory_order_relaxed);
+ // It is possible that more tasks are reserved than ready to run.
+ const uint32_t num_remaining =
+ num_tasks - std::min(num_reserved, num_tasks);
+ const uint32_t my_size =
+ std::max(num_remaining / (num_worker_threads * 4), 1u);
+#endif
+ const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
+ my_size, std::memory_order_relaxed);
+ const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
+ // Another thread already reserved the last task.
+ if (my_begin >= my_end) {
+ break;
+ }
+ for (uint32_t task = my_begin; task < my_end; ++task) {
+ self->data_func_(self->jpegxl_opaque_, task, thread);
+ }
+ }
+}
+
+// static
+void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
+ const int thread) {
+ // Until kWorkerExit command received:
+ for (;;) {
+ std::unique_lock<std::mutex> lock(self->mutex_);
+ // Notify main thread that this thread is ready.
+ if (++self->workers_ready_ == self->num_threads_) {
+ self->workers_ready_cv_.notify_one();
+ }
+ RESUME_WAIT:
+ // Wait for a command.
+ self->worker_start_cv_.wait(lock);
+ const WorkerCommand command = self->worker_start_command_;
+ switch (command) {
+ case kWorkerWait: // spurious wakeup:
+ goto RESUME_WAIT; // lock still held, avoid incrementing ready.
+ case kWorkerOnce:
+ lock.unlock();
+ self->data_func_(self->jpegxl_opaque_, thread, thread);
+ break;
+ case kWorkerExit:
+ return; // exits thread
+ default:
+ lock.unlock();
+ RunRange(self, command, thread);
+ break;
+ }
+ }
+}
+
+ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
+ : num_worker_threads_(num_worker_threads),
+ num_threads_(std::max(num_worker_threads, 1)) {
+ threads_.reserve(num_worker_threads_);
+
+ // Suppress "unused-private-field" warning.
+ (void)padding1;
+ (void)padding2;
+
+ // Safely handle spurious worker wakeups.
+ worker_start_command_ = kWorkerWait;
+
+ for (uint32_t i = 0; i < num_worker_threads_; ++i) {
+ threads_.emplace_back(ThreadFunc, this, i);
+ }
+
+ if (num_worker_threads_ != 0) {
+ WorkersReadyBarrier();
+ }
+}
+
+ThreadParallelRunner::~ThreadParallelRunner() {
+ if (num_worker_threads_ != 0) {
+ StartWorkers(kWorkerExit);
+ }
+
+ for (std::thread& thread : threads_) {
+ JXL_ASSERT(thread.joinable());
+ thread.join();
+ }
+}
+} // namespace jpegxl
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h
new file mode 100644
index 0000000000..199a5f2a8b
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h
@@ -0,0 +1,166 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//
+
+// C++ implementation using std::thread of a ::JxlParallelRunner.
+
+// The main class in this module, ThreadParallelRunner, implements a static
+// method ThreadParallelRunner::Runner than can be passed as a
+// JxlParallelRunner when using the JPEG XL library. This uses std::thread
+// internally and related synchronization functions. The number of threads
+// created is fixed at construction time and the threads are re-used for every
+// ThreadParallelRunner::Runner call. Only one concurrent Runner() call per
+// instance is allowed at a time.
+//
+// This is a scalable, lower-overhead thread pool runner, especially suitable
+// for data-parallel computations in the fork-join model, where clients need to
+// know when all tasks have completed.
+//
+// This thread pool can efficiently load-balance millions of tasks using an
+// atomic counter, thus avoiding per-task virtual or system calls. With 48
+// hyperthreads and 1M tasks that add to an atomic counter, overall runtime is
+// 10-20x higher when using std::async, and ~200x for a queue-based thread
+// pool.
+//
+// Usage:
+// ThreadParallelRunner runner;
+// JxlDecode(
+// ... , &ThreadParallelRunner::Runner, static_cast<void*>(&runner));
+
+#ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
+#define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
+
+#include <jxl/memory_manager.h>
+#include <jxl/parallel_runner.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <atomic>
+#include <condition_variable> //NOLINT
+#include <mutex> //NOLINT
+#include <thread> //NOLINT
+#include <vector>
+
+namespace jpegxl {
+
+// Main helper class implementing the ::JxlParallelRunner interface.
+class ThreadParallelRunner {
+ public:
+ // ::JxlParallelRunner interface.
+ static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque,
+ JxlParallelRunInit init,
+ JxlParallelRunFunction func,
+ uint32_t start_range, uint32_t end_range);
+
+ // Starts the given number of worker threads and blocks until they are ready.
+ // "num_worker_threads" defaults to one per hyperthread. If zero, all tasks
+ // run on the main thread.
+ explicit ThreadParallelRunner(
+ int num_worker_threads = std::thread::hardware_concurrency());
+
+ // Waits for all threads to exit.
+ ~ThreadParallelRunner();
+
+ // Returns maximum number of main/worker threads that may call Func. Useful
+ // for allocating per-thread storage.
+ size_t NumThreads() const { return num_threads_; }
+
+ // Runs func(thread, thread) on all thread(s) that may participate in Run.
+ // If NumThreads() == 0, runs on the main thread with thread == 0, otherwise
+ // concurrently called by each worker thread in [0, NumThreads()).
+ template <class Func>
+ void RunOnEachThread(const Func& func) {
+ if (num_worker_threads_ == 0) {
+ const int thread = 0;
+ func(thread, thread);
+ return;
+ }
+
+ data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>);
+ jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func));
+ StartWorkers(kWorkerOnce);
+ WorkersReadyBarrier();
+ }
+
+ JxlMemoryManager memory_manager;
+
+ private:
+ // After construction and between calls to Run, workers are "ready", i.e.
+ // waiting on worker_start_cv_. They are "started" by sending a "command"
+ // and notifying all worker_start_cv_ waiters. (That is why all workers
+ // must be ready/waiting - otherwise, the notification will not reach all of
+ // them and the main thread waits in vain for them to report readiness.)
+ using WorkerCommand = uint64_t;
+
+ // Special values; all others encode the begin/end parameters. Note that all
+ // these are no-op ranges (begin >= end) and therefore never used to encode
+ // ranges.
+ static constexpr WorkerCommand kWorkerWait = ~1ULL;
+ static constexpr WorkerCommand kWorkerOnce = ~2ULL;
+ static constexpr WorkerCommand kWorkerExit = ~3ULL;
+
+ // Calls f(task, thread). Used for type erasure of Func arguments. The
+ // signature must match JxlParallelRunFunction, hence a void* argument.
+ template <class Closure>
+ static void CallClosure(void* f, const uint32_t task, const size_t thread) {
+ (*reinterpret_cast<const Closure*>(f))(task, thread);
+ }
+
+ void WorkersReadyBarrier() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ // Typically only a single iteration.
+ while (workers_ready_ != threads_.size()) {
+ workers_ready_cv_.wait(lock);
+ }
+ workers_ready_ = 0;
+
+ // Safely handle spurious worker wakeups.
+ worker_start_command_ = kWorkerWait;
+ }
+
+ // Precondition: all workers are ready.
+ void StartWorkers(const WorkerCommand worker_command) {
+ mutex_.lock();
+ worker_start_command_ = worker_command;
+ // Workers will need this lock, so release it before they wake up.
+ mutex_.unlock();
+ worker_start_cv_.notify_all();
+ }
+
+ // Attempts to reserve and perform some work from the global range of tasks,
+ // which is encoded within "command". Returns after all tasks are reserved.
+ static void RunRange(ThreadParallelRunner* self, const WorkerCommand command,
+ const int thread);
+
+ static void ThreadFunc(ThreadParallelRunner* self, int thread);
+
+ // Unmodified after ctor, but cannot be const because we call thread::join().
+ std::vector<std::thread> threads_;
+
+ const uint32_t num_worker_threads_; // == threads_.size()
+ const uint32_t num_threads_;
+
+ std::atomic<int> depth_{0}; // detects if Run is re-entered (not supported).
+
+ std::mutex mutex_; // guards both cv and their variables.
+ std::condition_variable workers_ready_cv_;
+ uint32_t workers_ready_ = 0;
+ std::condition_variable worker_start_cv_;
+ WorkerCommand worker_start_command_;
+
+ // Written by main thread, read by workers (after mutex lock/unlock).
+ JxlParallelRunFunction data_func_;
+ void* jpegxl_opaque_;
+
+ // Updated by workers; padding avoids false sharing.
+ uint8_t padding1[64];
+ std::atomic<uint32_t> num_reserved_{0};
+ uint8_t padding2[64];
+};
+
+} // namespace jpegxl
+
+#endif // LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc
new file mode 100644
index 0000000000..7c8e602764
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc
@@ -0,0 +1,122 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+#include <atomic>
+
+#include "lib/jxl/base/data_parallel.h"
+#include "lib/jxl/test_utils.h"
+#include "lib/jxl/testing.h"
+
+using jxl::test::ThreadPoolForTests;
+
+namespace jpegxl {
+namespace {
+
+int PopulationCount(uint64_t bits) {
+ int num_set = 0;
+ while (bits != 0) {
+ num_set += bits & 1;
+ bits >>= 1;
+ }
+ return num_set;
+}
+
+// Ensures task parameter is in bounds, every parameter is reached,
+// pool can be reused (multiple consecutive Run calls), pool can be destroyed
+// (joining with its threads), num_threads=0 works (runs on current thread).
+TEST(ThreadParallelRunnerTest, TestPool) {
+ for (int num_threads = 0; num_threads <= 18; ++num_threads) {
+ ThreadPoolForTests pool(num_threads);
+ for (int num_tasks = 0; num_tasks < 32; ++num_tasks) {
+ std::vector<int> mementos(num_tasks);
+ for (int begin = 0; begin < 32; ++begin) {
+ std::fill(mementos.begin(), mementos.end(), 0);
+ EXPECT_TRUE(RunOnPool(
+ &pool, begin, begin + num_tasks, jxl::ThreadPool::NoInit,
+ [begin, num_tasks, &mementos](const int task, const int thread) {
+ // Parameter is in the given range
+ EXPECT_GE(task, begin);
+ EXPECT_LT(task, begin + num_tasks);
+
+ // Store mementos to be sure we visited each task.
+ mementos.at(task - begin) = 1000 + task;
+ },
+ "TestPool"));
+ for (int task = begin; task < begin + num_tasks; ++task) {
+ EXPECT_EQ(1000 + task, mementos.at(task - begin));
+ }
+ }
+ }
+ }
+}
+
+// Verify "thread" parameter when processing few tasks.
+TEST(ThreadParallelRunnerTest, TestSmallAssignments) {
+ const int kMaxThreads = 8;
+ for (int num_threads = 1; num_threads <= kMaxThreads; ++num_threads) {
+ ThreadPoolForTests pool(num_threads);
+
+ // (Avoid mutex because it may perturb the worker thread scheduling)
+ std::atomic<uint64_t> id_bits{0};
+ std::atomic<int> num_calls{0};
+
+ EXPECT_TRUE(RunOnPool(
+ &pool, 0, num_threads, jxl::ThreadPool::NoInit,
+ [&num_calls, num_threads, &id_bits](const int task, const int thread) {
+ num_calls.fetch_add(1, std::memory_order_relaxed);
+
+ EXPECT_LT(thread, num_threads);
+ uint64_t bits = id_bits.load(std::memory_order_relaxed);
+ while (
+ !id_bits.compare_exchange_weak(bits, bits | (1ULL << thread))) {
+ }
+ },
+ "TestSmallAssignments"));
+
+ // Correct number of tasks.
+ EXPECT_EQ(num_threads, num_calls.load());
+
+ const int num_participants = PopulationCount(id_bits.load());
+ // Can't expect equality because other workers may have woken up too late.
+ EXPECT_LE(num_participants, num_threads);
+ }
+}
+
+struct Counter {
+ Counter() {
+ // Suppress "unused-field" warning.
+ (void)padding;
+ }
+ void Assimilate(const Counter& victim) { counter += victim.counter; }
+ int counter = 0;
+ int padding[31];
+};
+
+TEST(ThreadParallelRunnerTest, TestCounter) {
+ const int kNumThreads = 12;
+ ThreadPoolForTests pool(kNumThreads);
+ alignas(128) Counter counters[kNumThreads];
+
+ const int kNumTasks = kNumThreads * 19;
+ EXPECT_TRUE(RunOnPool(
+ &pool, 0, kNumTasks, jxl::ThreadPool::NoInit,
+ [&counters](const int task, const int thread) {
+ counters[thread].counter += task;
+ },
+ "TestCounter"));
+
+ int expected = 0;
+ for (int i = 0; i < kNumTasks; ++i) {
+ expected += i;
+ }
+
+ for (int i = 1; i < kNumThreads; ++i) {
+ counters[0].Assimilate(counters[i]);
+ }
+ EXPECT_EQ(expected, counters[0].counter);
+}
+
+} // namespace
+} // namespace jpegxl