diff options
Diffstat (limited to 'third_party/jpeg-xl/lib/threads')
6 files changed, 819 insertions, 0 deletions
diff --git a/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in b/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in new file mode 100644 index 0000000000..dfbaa3ffb5 --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/libjxl_threads.pc.in @@ -0,0 +1,13 @@ +prefix=@CMAKE_INSTALL_PREFIX@ +exec_prefix=${prefix} +libdir=@PKGCONFIG_TARGET_LIBS@ +includedir=@PKGCONFIG_TARGET_INCLUDES@ + +Name: libjxl_threads +Description: JPEG XL multi-thread runner using std::threads. +Version: @JPEGXL_LIBRARY_VERSION@ +@JPEGXL_REQUIRES_TYPE@: @JPEGXL_THREADS_LIBRARY_REQUIRES@ +Libs: -L${libdir} -ljxl_threads +Libs.private: -lm +Cflags: -I${includedir} +Cflags.private: -DJXL_THREADS_STATIC_DEFINE diff --git a/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc b/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc new file mode 100644 index 0000000000..238ccbf20c --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/resizable_parallel_runner.cc @@ -0,0 +1,200 @@ +// Copyright (c) the JPEG XL Project Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include <jxl/jxl_threads_export.h> +#include <jxl/memory_manager.h> +#include <jxl/parallel_runner.h> +#include <jxl/resizable_parallel_runner.h> + +#include <algorithm> +#include <atomic> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <thread> +#include <vector> + +namespace jpegxl { +namespace { + +// A thread pool that allows changing the number of threads it runs. It also +// runs tasks on the calling thread, which can work better on schedulers for +// heterogeneous architectures. +struct ResizeableParallelRunner { + void SetNumThreads(size_t num) { + if (num > 0) { + num -= 1; + } + { + std::unique_lock<std::mutex> l(state_mutex_); + num_desired_workers_ = num; + workers_can_proceed_.notify_all(); + } + if (workers_.size() < num) { + for (size_t i = workers_.size(); i < num; i++) { + workers_.emplace_back([this, i]() { WorkerBody(i); }); + } + } + if (workers_.size() > num) { + for (size_t i = num; i < workers_.size(); i++) { + workers_[i].join(); + } + workers_.resize(num); + } + } + + ~ResizeableParallelRunner() { SetNumThreads(0); } + + JxlParallelRetCode Run(void* jxl_opaque, JxlParallelRunInit init, + JxlParallelRunFunction func, uint32_t start, + uint32_t end) { + if (start + 1 == end) { + JxlParallelRetCode ret = init(jxl_opaque, 1); + if (ret != 0) return ret; + + func(jxl_opaque, start, 0); + return ret; + } + + size_t num_workers = std::min<size_t>(workers_.size() + 1, end - start); + JxlParallelRetCode ret = init(jxl_opaque, num_workers); + if (ret != 0) { + return ret; + } + + { + std::unique_lock<std::mutex> l(state_mutex_); + // Avoid waking up more workers than needed. + max_running_workers_ = end - start - 1; + next_task_ = start; + end_task_ = end; + func_ = func; + jxl_opaque_ = jxl_opaque; + work_available_ = true; + num_running_workers_++; + workers_can_proceed_.notify_all(); + } + + DequeueTasks(0); + + while (true) { + std::unique_lock<std::mutex> l(state_mutex_); + if (num_running_workers_ == 0) break; + work_done_.wait(l); + } + + return ret; + } + + private: + void WorkerBody(size_t worker_id) { + while (true) { + { + std::unique_lock<std::mutex> l(state_mutex_); + // Worker pool was reduced, resize down. + if (worker_id >= num_desired_workers_) { + return; + } + // Nothing to do this time. + if (!work_available_ || worker_id >= max_running_workers_) { + workers_can_proceed_.wait(l); + continue; + } + num_running_workers_++; + } + DequeueTasks(worker_id + 1); + } + } + + void DequeueTasks(size_t thread_id) { + while (true) { + uint32_t task = next_task_++; + if (task >= end_task_) { + std::unique_lock<std::mutex> l(state_mutex_); + num_running_workers_--; + work_available_ = false; + if (num_running_workers_ == 0) { + work_done_.notify_all(); + } + break; + } + func_(jxl_opaque_, task, thread_id); + } + } + + // Checks when the worker has something to do, which can be one of: + // - quitting (when worker_id >= num_desired_workers_) + // - having work available for them (work_available_ is true and worker_id >= + // max_running_workers_) + std::condition_variable workers_can_proceed_; + + // Workers are done, and the main thread can proceed (num_running_workers_ == + // 0) + std::condition_variable work_done_; + + std::vector<std::thread> workers_; + + // Protects all the remaining variables, except for func_, jxl_opaque_ and + // end_task_ (for which only the write by the main thread is protected, and + // subsequent uses by workers happen-after it) and next_task_ (which is + // atomic). + std::mutex state_mutex_; + + // Range of tasks still need to be done. + std::atomic<uint32_t> next_task_; + uint32_t end_task_; + + // Function to run and its argument. + JxlParallelRunFunction func_; + void* jxl_opaque_; // not owned + + // Variables that control the workers: + // - work_available_ is set to true after a call to Run() and to false at the + // end of it. + // - num_desired_workers_ represents the number of workers that should be + // present. + // - max_running_workers_ represents the number of workers that should be + // executing tasks. + // - num_running_workers_ represents the number of workers that are executing + // tasks. + size_t num_desired_workers_ = 0; + size_t max_running_workers_ = 0; + size_t num_running_workers_ = 0; + bool work_available_ = false; +}; +} // namespace +} // namespace jpegxl + +extern "C" { +JXL_THREADS_EXPORT JxlParallelRetCode JxlResizableParallelRunner( + void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, + JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { + return static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque) + ->Run(jpegxl_opaque, init, func, start_range, end_range); +} + +JXL_THREADS_EXPORT void* JxlResizableParallelRunnerCreate( + const JxlMemoryManager* memory_manager) { + return new jpegxl::ResizeableParallelRunner(); +} + +JXL_THREADS_EXPORT void JxlResizableParallelRunnerSetThreads( + void* runner_opaque, size_t num_threads) { + static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque) + ->SetNumThreads(num_threads); +} + +JXL_THREADS_EXPORT void JxlResizableParallelRunnerDestroy(void* runner_opaque) { + delete static_cast<jpegxl::ResizeableParallelRunner*>(runner_opaque); +} + +JXL_THREADS_EXPORT uint32_t +JxlResizableParallelRunnerSuggestThreads(uint64_t xsize, uint64_t ysize) { + // ~one thread per group. + return std::min<uint64_t>(std::thread::hardware_concurrency(), + xsize * ysize / (256 * 256)); +} +} diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc new file mode 100644 index 0000000000..d12947ce55 --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner.cc @@ -0,0 +1,107 @@ +// Copyright (c) the JPEG XL Project Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include <jxl/memory_manager.h> +#include <jxl/parallel_runner.h> +#include <jxl/thread_parallel_runner.h> +#include <string.h> + +#include <cstdint> +#include <cstdlib> +#include <thread> + +#include "lib/threads/thread_parallel_runner_internal.h" + +namespace { + +// Default JxlMemoryManager using malloc and free for the jpegxl_threads +// library. Same as the default JxlMemoryManager for the jpegxl library +// itself. + +// Default alloc and free functions. +void* ThreadMemoryManagerDefaultAlloc(void* opaque, size_t size) { + return malloc(size); +} + +void ThreadMemoryManagerDefaultFree(void* opaque, void* address) { + free(address); +} + +// Initializes the memory manager instance with the passed one. The +// MemoryManager passed in |memory_manager| may be NULL or contain NULL +// functions which will be initialized with the default ones. If either alloc +// or free are NULL, then both must be NULL, otherwise this function returns an +// error. +bool ThreadMemoryManagerInit(JxlMemoryManager* self, + const JxlMemoryManager* memory_manager) { + if (memory_manager) { + *self = *memory_manager; + } else { + memset(self, 0, sizeof(*self)); + } + if (!self->alloc != !self->free) { + return false; + } + if (!self->alloc) self->alloc = ThreadMemoryManagerDefaultAlloc; + if (!self->free) self->free = ThreadMemoryManagerDefaultFree; + + return true; +} + +void* ThreadMemoryManagerAlloc(const JxlMemoryManager* memory_manager, + size_t size) { + return memory_manager->alloc(memory_manager->opaque, size); +} + +void ThreadMemoryManagerFree(const JxlMemoryManager* memory_manager, + void* address) { + return memory_manager->free(memory_manager->opaque, address); +} + +} // namespace + +JxlParallelRetCode JxlThreadParallelRunner( + void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, + JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { + return jpegxl::ThreadParallelRunner::Runner( + runner_opaque, jpegxl_opaque, init, func, start_range, end_range); +} + +/// Starts the given number of worker threads and blocks until they are ready. +/// "num_worker_threads" defaults to one per hyperthread. If zero, all tasks +/// run on the main thread. +void* JxlThreadParallelRunnerCreate(const JxlMemoryManager* memory_manager, + size_t num_worker_threads) { + JxlMemoryManager local_memory_manager; + if (!ThreadMemoryManagerInit(&local_memory_manager, memory_manager)) + return nullptr; + + void* alloc = ThreadMemoryManagerAlloc(&local_memory_manager, + sizeof(jpegxl::ThreadParallelRunner)); + if (!alloc) return nullptr; + // Placement new constructor on allocated memory + jpegxl::ThreadParallelRunner* runner = + new (alloc) jpegxl::ThreadParallelRunner(num_worker_threads); + runner->memory_manager = local_memory_manager; + + return runner; +} + +void JxlThreadParallelRunnerDestroy(void* runner_opaque) { + jpegxl::ThreadParallelRunner* runner = + reinterpret_cast<jpegxl::ThreadParallelRunner*>(runner_opaque); + if (runner) { + JxlMemoryManager local_memory_manager = runner->memory_manager; + // Call destructor directly since custom free function is used. + runner->~ThreadParallelRunner(); + ThreadMemoryManagerFree(&local_memory_manager, runner); + } +} + +// Get default value for num_worker_threads parameter of +// InitJxlThreadParallelRunner. +size_t JxlThreadParallelRunnerDefaultNumWorkerThreads() { + return std::thread::hardware_concurrency(); +} diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc new file mode 100644 index 0000000000..5f73d94897 --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc @@ -0,0 +1,211 @@ +// Copyright (c) the JPEG XL Project Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include "lib/threads/thread_parallel_runner_internal.h" + +#include <jxl/parallel_runner.h> + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <thread> + +#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \ + defined(THREAD_SANITIZER) +#include "sanitizer/common_interface_defs.h" // __sanitizer_print_stack_trace +#endif // defined(*_SANITIZER) + +namespace { + +// Important: JXL_ASSERT does not guarantee running the `condition` code, +// use only for debug mode checks. + +#if JXL_ENABLE_ASSERT +// Exits the program after printing a stack trace when possible. +bool Abort() { +#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \ + defined(THREAD_SANITIZER) + // If compiled with any sanitizer print a stack trace. This call doesn't crash + // the program, instead the trap below will crash it also allowing gdb to + // break there. + __sanitizer_print_stack_trace(); +#endif // defined(*_SANITIZER) + +#ifdef _MSC_VER + __debugbreak(); + abort(); +#else + __builtin_trap(); +#endif +} +#define JXL_ASSERT(condition) \ + do { \ + if (!(condition)) { \ + Abort(); \ + } \ + } while (0) +#else +#define JXL_ASSERT(condition) \ + do { \ + } while (0) +#endif +} // namespace + +namespace jpegxl { + +// static +JxlParallelRetCode ThreadParallelRunner::Runner( + void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, + JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { + ThreadParallelRunner* self = + static_cast<ThreadParallelRunner*>(runner_opaque); + if (start_range > end_range) return -1; + if (start_range == end_range) return 0; + + int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1)); + if (ret != 0) return ret; + + // Use a sequential run when num_worker_threads_ is zero since we have no + // worker threads. + if (self->num_worker_threads_ == 0) { + const size_t thread = 0; + for (uint32_t task = start_range; task < end_range; ++task) { + func(jpegxl_opaque, task, thread); + } + return 0; + } + + if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) { + return -1; // Must not re-enter. + } + + const WorkerCommand worker_command = + (static_cast<WorkerCommand>(start_range) << 32) + end_range; + // Ensure the inputs do not result in a reserved command. + JXL_ASSERT(worker_command != kWorkerWait); + JXL_ASSERT(worker_command != kWorkerOnce); + JXL_ASSERT(worker_command != kWorkerExit); + + self->data_func_ = func; + self->jpegxl_opaque_ = jpegxl_opaque; + self->num_reserved_.store(0, std::memory_order_relaxed); + + self->StartWorkers(worker_command); + self->WorkersReadyBarrier(); + + if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) { + return -1; + } + return 0; +} + +// static +void ThreadParallelRunner::RunRange(ThreadParallelRunner* self, + const WorkerCommand command, + const int thread) { + const uint32_t begin = command >> 32; + const uint32_t end = command & 0xFFFFFFFF; + const uint32_t num_tasks = end - begin; + const uint32_t num_worker_threads = self->num_worker_threads_; + + // OpenMP introduced several "schedule" strategies: + // "single" (static assignment of exactly one chunk per thread): slower. + // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. + // "guided" (allocates k tasks, decreases k): computing k = remaining/n + // is faster than halving k each iteration. We prefer this strategy + // because it avoids user-specified parameters. + + for (;;) { +#if 0 + // dynamic + const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1); +#else + // guided + const uint32_t num_reserved = + self->num_reserved_.load(std::memory_order_relaxed); + // It is possible that more tasks are reserved than ready to run. + const uint32_t num_remaining = + num_tasks - std::min(num_reserved, num_tasks); + const uint32_t my_size = + std::max(num_remaining / (num_worker_threads * 4), 1u); +#endif + const uint32_t my_begin = begin + self->num_reserved_.fetch_add( + my_size, std::memory_order_relaxed); + const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks); + // Another thread already reserved the last task. + if (my_begin >= my_end) { + break; + } + for (uint32_t task = my_begin; task < my_end; ++task) { + self->data_func_(self->jpegxl_opaque_, task, thread); + } + } +} + +// static +void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self, + const int thread) { + // Until kWorkerExit command received: + for (;;) { + std::unique_lock<std::mutex> lock(self->mutex_); + // Notify main thread that this thread is ready. + if (++self->workers_ready_ == self->num_threads_) { + self->workers_ready_cv_.notify_one(); + } + RESUME_WAIT: + // Wait for a command. + self->worker_start_cv_.wait(lock); + const WorkerCommand command = self->worker_start_command_; + switch (command) { + case kWorkerWait: // spurious wakeup: + goto RESUME_WAIT; // lock still held, avoid incrementing ready. + case kWorkerOnce: + lock.unlock(); + self->data_func_(self->jpegxl_opaque_, thread, thread); + break; + case kWorkerExit: + return; // exits thread + default: + lock.unlock(); + RunRange(self, command, thread); + break; + } + } +} + +ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads) + : num_worker_threads_(num_worker_threads), + num_threads_(std::max(num_worker_threads, 1)) { + threads_.reserve(num_worker_threads_); + + // Suppress "unused-private-field" warning. + (void)padding1; + (void)padding2; + + // Safely handle spurious worker wakeups. + worker_start_command_ = kWorkerWait; + + for (uint32_t i = 0; i < num_worker_threads_; ++i) { + threads_.emplace_back(ThreadFunc, this, i); + } + + if (num_worker_threads_ != 0) { + WorkersReadyBarrier(); + } +} + +ThreadParallelRunner::~ThreadParallelRunner() { + if (num_worker_threads_ != 0) { + StartWorkers(kWorkerExit); + } + + for (std::thread& thread : threads_) { + JXL_ASSERT(thread.joinable()); + thread.join(); + } +} +} // namespace jpegxl diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h new file mode 100644 index 0000000000..199a5f2a8b --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h @@ -0,0 +1,166 @@ +// Copyright (c) the JPEG XL Project Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// + +// C++ implementation using std::thread of a ::JxlParallelRunner. + +// The main class in this module, ThreadParallelRunner, implements a static +// method ThreadParallelRunner::Runner than can be passed as a +// JxlParallelRunner when using the JPEG XL library. This uses std::thread +// internally and related synchronization functions. The number of threads +// created is fixed at construction time and the threads are re-used for every +// ThreadParallelRunner::Runner call. Only one concurrent Runner() call per +// instance is allowed at a time. +// +// This is a scalable, lower-overhead thread pool runner, especially suitable +// for data-parallel computations in the fork-join model, where clients need to +// know when all tasks have completed. +// +// This thread pool can efficiently load-balance millions of tasks using an +// atomic counter, thus avoiding per-task virtual or system calls. With 48 +// hyperthreads and 1M tasks that add to an atomic counter, overall runtime is +// 10-20x higher when using std::async, and ~200x for a queue-based thread +// pool. +// +// Usage: +// ThreadParallelRunner runner; +// JxlDecode( +// ... , &ThreadParallelRunner::Runner, static_cast<void*>(&runner)); + +#ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_ +#define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_ + +#include <jxl/memory_manager.h> +#include <jxl/parallel_runner.h> +#include <stddef.h> +#include <stdint.h> +#include <stdlib.h> + +#include <atomic> +#include <condition_variable> //NOLINT +#include <mutex> //NOLINT +#include <thread> //NOLINT +#include <vector> + +namespace jpegxl { + +// Main helper class implementing the ::JxlParallelRunner interface. +class ThreadParallelRunner { + public: + // ::JxlParallelRunner interface. + static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque, + JxlParallelRunInit init, + JxlParallelRunFunction func, + uint32_t start_range, uint32_t end_range); + + // Starts the given number of worker threads and blocks until they are ready. + // "num_worker_threads" defaults to one per hyperthread. If zero, all tasks + // run on the main thread. + explicit ThreadParallelRunner( + int num_worker_threads = std::thread::hardware_concurrency()); + + // Waits for all threads to exit. + ~ThreadParallelRunner(); + + // Returns maximum number of main/worker threads that may call Func. Useful + // for allocating per-thread storage. + size_t NumThreads() const { return num_threads_; } + + // Runs func(thread, thread) on all thread(s) that may participate in Run. + // If NumThreads() == 0, runs on the main thread with thread == 0, otherwise + // concurrently called by each worker thread in [0, NumThreads()). + template <class Func> + void RunOnEachThread(const Func& func) { + if (num_worker_threads_ == 0) { + const int thread = 0; + func(thread, thread); + return; + } + + data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>); + jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func)); + StartWorkers(kWorkerOnce); + WorkersReadyBarrier(); + } + + JxlMemoryManager memory_manager; + + private: + // After construction and between calls to Run, workers are "ready", i.e. + // waiting on worker_start_cv_. They are "started" by sending a "command" + // and notifying all worker_start_cv_ waiters. (That is why all workers + // must be ready/waiting - otherwise, the notification will not reach all of + // them and the main thread waits in vain for them to report readiness.) + using WorkerCommand = uint64_t; + + // Special values; all others encode the begin/end parameters. Note that all + // these are no-op ranges (begin >= end) and therefore never used to encode + // ranges. + static constexpr WorkerCommand kWorkerWait = ~1ULL; + static constexpr WorkerCommand kWorkerOnce = ~2ULL; + static constexpr WorkerCommand kWorkerExit = ~3ULL; + + // Calls f(task, thread). Used for type erasure of Func arguments. The + // signature must match JxlParallelRunFunction, hence a void* argument. + template <class Closure> + static void CallClosure(void* f, const uint32_t task, const size_t thread) { + (*reinterpret_cast<const Closure*>(f))(task, thread); + } + + void WorkersReadyBarrier() { + std::unique_lock<std::mutex> lock(mutex_); + // Typically only a single iteration. + while (workers_ready_ != threads_.size()) { + workers_ready_cv_.wait(lock); + } + workers_ready_ = 0; + + // Safely handle spurious worker wakeups. + worker_start_command_ = kWorkerWait; + } + + // Precondition: all workers are ready. + void StartWorkers(const WorkerCommand worker_command) { + mutex_.lock(); + worker_start_command_ = worker_command; + // Workers will need this lock, so release it before they wake up. + mutex_.unlock(); + worker_start_cv_.notify_all(); + } + + // Attempts to reserve and perform some work from the global range of tasks, + // which is encoded within "command". Returns after all tasks are reserved. + static void RunRange(ThreadParallelRunner* self, const WorkerCommand command, + const int thread); + + static void ThreadFunc(ThreadParallelRunner* self, int thread); + + // Unmodified after ctor, but cannot be const because we call thread::join(). + std::vector<std::thread> threads_; + + const uint32_t num_worker_threads_; // == threads_.size() + const uint32_t num_threads_; + + std::atomic<int> depth_{0}; // detects if Run is re-entered (not supported). + + std::mutex mutex_; // guards both cv and their variables. + std::condition_variable workers_ready_cv_; + uint32_t workers_ready_ = 0; + std::condition_variable worker_start_cv_; + WorkerCommand worker_start_command_; + + // Written by main thread, read by workers (after mutex lock/unlock). + JxlParallelRunFunction data_func_; + void* jpegxl_opaque_; + + // Updated by workers; padding avoids false sharing. + uint8_t padding1[64]; + std::atomic<uint32_t> num_reserved_{0}; + uint8_t padding2[64]; +}; + +} // namespace jpegxl + +#endif // LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_ diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc new file mode 100644 index 0000000000..7c8e602764 --- /dev/null +++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_test.cc @@ -0,0 +1,122 @@ +// Copyright (c) the JPEG XL Project Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +#include <atomic> + +#include "lib/jxl/base/data_parallel.h" +#include "lib/jxl/test_utils.h" +#include "lib/jxl/testing.h" + +using jxl::test::ThreadPoolForTests; + +namespace jpegxl { +namespace { + +int PopulationCount(uint64_t bits) { + int num_set = 0; + while (bits != 0) { + num_set += bits & 1; + bits >>= 1; + } + return num_set; +} + +// Ensures task parameter is in bounds, every parameter is reached, +// pool can be reused (multiple consecutive Run calls), pool can be destroyed +// (joining with its threads), num_threads=0 works (runs on current thread). +TEST(ThreadParallelRunnerTest, TestPool) { + for (int num_threads = 0; num_threads <= 18; ++num_threads) { + ThreadPoolForTests pool(num_threads); + for (int num_tasks = 0; num_tasks < 32; ++num_tasks) { + std::vector<int> mementos(num_tasks); + for (int begin = 0; begin < 32; ++begin) { + std::fill(mementos.begin(), mementos.end(), 0); + EXPECT_TRUE(RunOnPool( + &pool, begin, begin + num_tasks, jxl::ThreadPool::NoInit, + [begin, num_tasks, &mementos](const int task, const int thread) { + // Parameter is in the given range + EXPECT_GE(task, begin); + EXPECT_LT(task, begin + num_tasks); + + // Store mementos to be sure we visited each task. + mementos.at(task - begin) = 1000 + task; + }, + "TestPool")); + for (int task = begin; task < begin + num_tasks; ++task) { + EXPECT_EQ(1000 + task, mementos.at(task - begin)); + } + } + } + } +} + +// Verify "thread" parameter when processing few tasks. +TEST(ThreadParallelRunnerTest, TestSmallAssignments) { + const int kMaxThreads = 8; + for (int num_threads = 1; num_threads <= kMaxThreads; ++num_threads) { + ThreadPoolForTests pool(num_threads); + + // (Avoid mutex because it may perturb the worker thread scheduling) + std::atomic<uint64_t> id_bits{0}; + std::atomic<int> num_calls{0}; + + EXPECT_TRUE(RunOnPool( + &pool, 0, num_threads, jxl::ThreadPool::NoInit, + [&num_calls, num_threads, &id_bits](const int task, const int thread) { + num_calls.fetch_add(1, std::memory_order_relaxed); + + EXPECT_LT(thread, num_threads); + uint64_t bits = id_bits.load(std::memory_order_relaxed); + while ( + !id_bits.compare_exchange_weak(bits, bits | (1ULL << thread))) { + } + }, + "TestSmallAssignments")); + + // Correct number of tasks. + EXPECT_EQ(num_threads, num_calls.load()); + + const int num_participants = PopulationCount(id_bits.load()); + // Can't expect equality because other workers may have woken up too late. + EXPECT_LE(num_participants, num_threads); + } +} + +struct Counter { + Counter() { + // Suppress "unused-field" warning. + (void)padding; + } + void Assimilate(const Counter& victim) { counter += victim.counter; } + int counter = 0; + int padding[31]; +}; + +TEST(ThreadParallelRunnerTest, TestCounter) { + const int kNumThreads = 12; + ThreadPoolForTests pool(kNumThreads); + alignas(128) Counter counters[kNumThreads]; + + const int kNumTasks = kNumThreads * 19; + EXPECT_TRUE(RunOnPool( + &pool, 0, kNumTasks, jxl::ThreadPool::NoInit, + [&counters](const int task, const int thread) { + counters[thread].counter += task; + }, + "TestCounter")); + + int expected = 0; + for (int i = 0; i < kNumTasks; ++i) { + expected += i; + } + + for (int i = 1; i < kNumThreads; ++i) { + counters[0].Assimilate(counters[i]); + } + EXPECT_EQ(expected, counters[0].counter); +} + +} // namespace +} // namespace jpegxl |