summaryrefslogtreecommitdiffstats
path: root/third_party/jpeg-xl/lib/threads/thread_parallel_runner_internal.cc
blob: 5f73d9489706a0e510d244588854f9b0c9ec3d1a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

#include "lib/threads/thread_parallel_runner_internal.h"

#include <jxl/parallel_runner.h>

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <thread>

#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
    defined(THREAD_SANITIZER)
#include "sanitizer/common_interface_defs.h"  // __sanitizer_print_stack_trace
#endif                                        // defined(*_SANITIZER)

namespace {

// Important: JXL_ASSERT does not guarantee running the `condition` code,
// use only for debug mode checks.

#if JXL_ENABLE_ASSERT
// Exits the program after printing a stack trace when possible.
bool Abort() {
#if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
    defined(THREAD_SANITIZER)
  // If compiled with any sanitizer print a stack trace. This call doesn't crash
  // the program, instead the trap below will crash it also allowing gdb to
  // break there.
  __sanitizer_print_stack_trace();
#endif  // defined(*_SANITIZER)

#ifdef _MSC_VER
  __debugbreak();
  abort();
#else
  __builtin_trap();
#endif
}
#define JXL_ASSERT(condition) \
  do {                        \
    if (!(condition)) {       \
      Abort();                \
    }                         \
  } while (0)
#else
#define JXL_ASSERT(condition) \
  do {                        \
  } while (0)
#endif
}  // namespace

namespace jpegxl {

// static
JxlParallelRetCode ThreadParallelRunner::Runner(
    void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
    JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
  ThreadParallelRunner* self =
      static_cast<ThreadParallelRunner*>(runner_opaque);
  if (start_range > end_range) return -1;
  if (start_range == end_range) return 0;

  int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
  if (ret != 0) return ret;

  // Use a sequential run when num_worker_threads_ is zero since we have no
  // worker threads.
  if (self->num_worker_threads_ == 0) {
    const size_t thread = 0;
    for (uint32_t task = start_range; task < end_range; ++task) {
      func(jpegxl_opaque, task, thread);
    }
    return 0;
  }

  if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
    return -1;  // Must not re-enter.
  }

  const WorkerCommand worker_command =
      (static_cast<WorkerCommand>(start_range) << 32) + end_range;
  // Ensure the inputs do not result in a reserved command.
  JXL_ASSERT(worker_command != kWorkerWait);
  JXL_ASSERT(worker_command != kWorkerOnce);
  JXL_ASSERT(worker_command != kWorkerExit);

  self->data_func_ = func;
  self->jpegxl_opaque_ = jpegxl_opaque;
  self->num_reserved_.store(0, std::memory_order_relaxed);

  self->StartWorkers(worker_command);
  self->WorkersReadyBarrier();

  if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
    return -1;
  }
  return 0;
}

// static
void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
                                    const WorkerCommand command,
                                    const int thread) {
  const uint32_t begin = command >> 32;
  const uint32_t end = command & 0xFFFFFFFF;
  const uint32_t num_tasks = end - begin;
  const uint32_t num_worker_threads = self->num_worker_threads_;

  // OpenMP introduced several "schedule" strategies:
  // "single" (static assignment of exactly one chunk per thread): slower.
  // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
  // "guided" (allocates k tasks, decreases k): computing k = remaining/n
  //   is faster than halving k each iteration. We prefer this strategy
  //   because it avoids user-specified parameters.

  for (;;) {
#if 0
      // dynamic
      const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
#else
    // guided
    const uint32_t num_reserved =
        self->num_reserved_.load(std::memory_order_relaxed);
    // It is possible that more tasks are reserved than ready to run.
    const uint32_t num_remaining =
        num_tasks - std::min(num_reserved, num_tasks);
    const uint32_t my_size =
        std::max(num_remaining / (num_worker_threads * 4), 1u);
#endif
    const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
                                          my_size, std::memory_order_relaxed);
    const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
    // Another thread already reserved the last task.
    if (my_begin >= my_end) {
      break;
    }
    for (uint32_t task = my_begin; task < my_end; ++task) {
      self->data_func_(self->jpegxl_opaque_, task, thread);
    }
  }
}

// static
void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
                                      const int thread) {
  // Until kWorkerExit command received:
  for (;;) {
    std::unique_lock<std::mutex> lock(self->mutex_);
    // Notify main thread that this thread is ready.
    if (++self->workers_ready_ == self->num_threads_) {
      self->workers_ready_cv_.notify_one();
    }
  RESUME_WAIT:
    // Wait for a command.
    self->worker_start_cv_.wait(lock);
    const WorkerCommand command = self->worker_start_command_;
    switch (command) {
      case kWorkerWait:    // spurious wakeup:
        goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
      case kWorkerOnce:
        lock.unlock();
        self->data_func_(self->jpegxl_opaque_, thread, thread);
        break;
      case kWorkerExit:
        return;  // exits thread
      default:
        lock.unlock();
        RunRange(self, command, thread);
        break;
    }
  }
}

ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
    : num_worker_threads_(num_worker_threads),
      num_threads_(std::max(num_worker_threads, 1)) {
  threads_.reserve(num_worker_threads_);

  // Suppress "unused-private-field" warning.
  (void)padding1;
  (void)padding2;

  // Safely handle spurious worker wakeups.
  worker_start_command_ = kWorkerWait;

  for (uint32_t i = 0; i < num_worker_threads_; ++i) {
    threads_.emplace_back(ThreadFunc, this, i);
  }

  if (num_worker_threads_ != 0) {
    WorkersReadyBarrier();
  }
}

ThreadParallelRunner::~ThreadParallelRunner() {
  if (num_worker_threads_ != 0) {
    StartWorkers(kWorkerExit);
  }

  for (std::thread& thread : threads_) {
    JXL_ASSERT(thread.joinable());
    thread.join();
  }
}
}  // namespace jpegxl