summaryrefslogtreecommitdiffstats
path: root/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h')
-rw-r--r--third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h166
1 files changed, 166 insertions, 0 deletions
diff --git a/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h
new file mode 100644
index 0000000000..199a5f2a8b
--- /dev/null
+++ b/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.h
@@ -0,0 +1,166 @@
+// Copyright (c) the JPEG XL Project Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//
+
+// C++ implementation using std::thread of a ::JxlParallelRunner.
+
+// The main class in this module, ThreadParallelRunner, implements a static
+// method ThreadParallelRunner::Runner than can be passed as a
+// JxlParallelRunner when using the JPEG XL library. This uses std::thread
+// internally and related synchronization functions. The number of threads
+// created is fixed at construction time and the threads are re-used for every
+// ThreadParallelRunner::Runner call. Only one concurrent Runner() call per
+// instance is allowed at a time.
+//
+// This is a scalable, lower-overhead thread pool runner, especially suitable
+// for data-parallel computations in the fork-join model, where clients need to
+// know when all tasks have completed.
+//
+// This thread pool can efficiently load-balance millions of tasks using an
+// atomic counter, thus avoiding per-task virtual or system calls. With 48
+// hyperthreads and 1M tasks that add to an atomic counter, overall runtime is
+// 10-20x higher when using std::async, and ~200x for a queue-based thread
+// pool.
+//
+// Usage:
+// ThreadParallelRunner runner;
+// JxlDecode(
+// ... , &ThreadParallelRunner::Runner, static_cast<void*>(&runner));
+
+#ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
+#define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
+
+#include <jxl/memory_manager.h>
+#include <jxl/parallel_runner.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <atomic>
+#include <condition_variable> //NOLINT
+#include <mutex> //NOLINT
+#include <thread> //NOLINT
+#include <vector>
+
+namespace jpegxl {
+
+// Main helper class implementing the ::JxlParallelRunner interface.
+class ThreadParallelRunner {
+ public:
+ // ::JxlParallelRunner interface.
+ static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque,
+ JxlParallelRunInit init,
+ JxlParallelRunFunction func,
+ uint32_t start_range, uint32_t end_range);
+
+ // Starts the given number of worker threads and blocks until they are ready.
+ // "num_worker_threads" defaults to one per hyperthread. If zero, all tasks
+ // run on the main thread.
+ explicit ThreadParallelRunner(
+ int num_worker_threads = std::thread::hardware_concurrency());
+
+ // Waits for all threads to exit.
+ ~ThreadParallelRunner();
+
+ // Returns maximum number of main/worker threads that may call Func. Useful
+ // for allocating per-thread storage.
+ size_t NumThreads() const { return num_threads_; }
+
+ // Runs func(thread, thread) on all thread(s) that may participate in Run.
+ // If NumThreads() == 0, runs on the main thread with thread == 0, otherwise
+ // concurrently called by each worker thread in [0, NumThreads()).
+ template <class Func>
+ void RunOnEachThread(const Func& func) {
+ if (num_worker_threads_ == 0) {
+ const int thread = 0;
+ func(thread, thread);
+ return;
+ }
+
+ data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>);
+ jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func));
+ StartWorkers(kWorkerOnce);
+ WorkersReadyBarrier();
+ }
+
+ JxlMemoryManager memory_manager;
+
+ private:
+ // After construction and between calls to Run, workers are "ready", i.e.
+ // waiting on worker_start_cv_. They are "started" by sending a "command"
+ // and notifying all worker_start_cv_ waiters. (That is why all workers
+ // must be ready/waiting - otherwise, the notification will not reach all of
+ // them and the main thread waits in vain for them to report readiness.)
+ using WorkerCommand = uint64_t;
+
+ // Special values; all others encode the begin/end parameters. Note that all
+ // these are no-op ranges (begin >= end) and therefore never used to encode
+ // ranges.
+ static constexpr WorkerCommand kWorkerWait = ~1ULL;
+ static constexpr WorkerCommand kWorkerOnce = ~2ULL;
+ static constexpr WorkerCommand kWorkerExit = ~3ULL;
+
+ // Calls f(task, thread). Used for type erasure of Func arguments. The
+ // signature must match JxlParallelRunFunction, hence a void* argument.
+ template <class Closure>
+ static void CallClosure(void* f, const uint32_t task, const size_t thread) {
+ (*reinterpret_cast<const Closure*>(f))(task, thread);
+ }
+
+ void WorkersReadyBarrier() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ // Typically only a single iteration.
+ while (workers_ready_ != threads_.size()) {
+ workers_ready_cv_.wait(lock);
+ }
+ workers_ready_ = 0;
+
+ // Safely handle spurious worker wakeups.
+ worker_start_command_ = kWorkerWait;
+ }
+
+ // Precondition: all workers are ready.
+ void StartWorkers(const WorkerCommand worker_command) {
+ mutex_.lock();
+ worker_start_command_ = worker_command;
+ // Workers will need this lock, so release it before they wake up.
+ mutex_.unlock();
+ worker_start_cv_.notify_all();
+ }
+
+ // Attempts to reserve and perform some work from the global range of tasks,
+ // which is encoded within "command". Returns after all tasks are reserved.
+ static void RunRange(ThreadParallelRunner* self, const WorkerCommand command,
+ const int thread);
+
+ static void ThreadFunc(ThreadParallelRunner* self, int thread);
+
+ // Unmodified after ctor, but cannot be const because we call thread::join().
+ std::vector<std::thread> threads_;
+
+ const uint32_t num_worker_threads_; // == threads_.size()
+ const uint32_t num_threads_;
+
+ std::atomic<int> depth_{0}; // detects if Run is re-entered (not supported).
+
+ std::mutex mutex_; // guards both cv and their variables.
+ std::condition_variable workers_ready_cv_;
+ uint32_t workers_ready_ = 0;
+ std::condition_variable worker_start_cv_;
+ WorkerCommand worker_start_command_;
+
+ // Written by main thread, read by workers (after mutex lock/unlock).
+ JxlParallelRunFunction data_func_;
+ void* jpegxl_opaque_;
+
+ // Updated by workers; padding avoids false sharing.
+ uint8_t padding1[64];
+ std::atomic<uint32_t> num_reserved_{0};
+ uint8_t padding2[64];
+};
+
+} // namespace jpegxl
+
+#endif // LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_