summaryrefslogtreecommitdiffstats
path: root/third_party/highway/hwy/contrib/sort/bench_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/highway/hwy/contrib/sort/bench_parallel.cc')
-rw-r--r--third_party/highway/hwy/contrib/sort/bench_parallel.cc238
1 files changed, 238 insertions, 0 deletions
diff --git a/third_party/highway/hwy/contrib/sort/bench_parallel.cc b/third_party/highway/hwy/contrib/sort/bench_parallel.cc
new file mode 100644
index 0000000000..1c8c928e21
--- /dev/null
+++ b/third_party/highway/hwy/contrib/sort/bench_parallel.cc
@@ -0,0 +1,238 @@
+// Copyright 2021 Google LLC
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Concurrent, independent sorts for generating more memory traffic and testing
+// scalability.
+
+#include <stdint.h>
+#include <stdio.h>
+
+#include <condition_variable> //NOLINT
+#include <functional>
+#include <memory>
+#include <mutex> //NOLINT
+#include <thread> //NOLINT
+#include <utility>
+#include <vector>
+
+// clang-format off
+#undef HWY_TARGET_INCLUDE
+#define HWY_TARGET_INCLUDE "hwy/contrib/sort/bench_parallel.cc" //NOLINT
+#include "hwy/foreach_target.h" // IWYU pragma: keep
+
+// After foreach_target
+#include "hwy/contrib/sort/algo-inl.h"
+#include "hwy/contrib/sort/result-inl.h"
+#include "hwy/aligned_allocator.h"
+// Last
+#include "hwy/tests/test_util-inl.h"
+// clang-format on
+
+HWY_BEFORE_NAMESPACE();
+namespace hwy {
+namespace HWY_NAMESPACE {
+namespace {
+
+class ThreadPool {
+ public:
+ // Starts the given number of worker threads and blocks until they are ready.
+ explicit ThreadPool(
+ const size_t num_threads = std::thread::hardware_concurrency())
+ : num_threads_(num_threads) {
+ HWY_ASSERT(num_threads_ > 0);
+ threads_.reserve(num_threads_);
+ for (size_t i = 0; i < num_threads_; ++i) {
+ threads_.emplace_back(ThreadFunc, this, i);
+ }
+
+ WorkersReadyBarrier();
+ }
+
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator&(const ThreadPool&) = delete;
+
+ // Waits for all threads to exit.
+ ~ThreadPool() {
+ StartWorkers(kWorkerExit);
+
+ for (std::thread& thread : threads_) {
+ thread.join();
+ }
+ }
+
+ size_t NumThreads() const { return threads_.size(); }
+
+ template <class Func>
+ void RunOnThreads(size_t max_threads, const Func& func) {
+ task_ = &CallClosure<Func>;
+ data_ = &func;
+ StartWorkers(max_threads);
+ WorkersReadyBarrier();
+ }
+
+ private:
+ // After construction and between calls to Run, workers are "ready", i.e.
+ // waiting on worker_start_cv_. They are "started" by sending a "command"
+ // and notifying all worker_start_cv_ waiters. (That is why all workers
+ // must be ready/waiting - otherwise, the notification will not reach all of
+ // them and the main thread waits in vain for them to report readiness.)
+ using WorkerCommand = uint64_t;
+
+ static constexpr WorkerCommand kWorkerWait = ~1ULL;
+ static constexpr WorkerCommand kWorkerExit = ~2ULL;
+
+ // Calls a closure (lambda with captures).
+ template <class Closure>
+ static void CallClosure(const void* f, size_t thread) {
+ (*reinterpret_cast<const Closure*>(f))(thread);
+ }
+
+ void WorkersReadyBarrier() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ // Typically only a single iteration.
+ while (workers_ready_ != threads_.size()) {
+ workers_ready_cv_.wait(lock);
+ }
+ workers_ready_ = 0;
+
+ // Safely handle spurious worker wakeups.
+ worker_start_command_ = kWorkerWait;
+ }
+
+ // Precondition: all workers are ready.
+ void StartWorkers(const WorkerCommand worker_command) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ worker_start_command_ = worker_command;
+ // Workers will need this lock, so release it before they wake up.
+ lock.unlock();
+ worker_start_cv_.notify_all();
+ }
+
+ static void ThreadFunc(ThreadPool* self, size_t thread) {
+ // Until kWorkerExit command received:
+ for (;;) {
+ std::unique_lock<std::mutex> lock(self->mutex_);
+ // Notify main thread that this thread is ready.
+ if (++self->workers_ready_ == self->num_threads_) {
+ self->workers_ready_cv_.notify_one();
+ }
+ RESUME_WAIT:
+ // Wait for a command.
+ self->worker_start_cv_.wait(lock);
+ const WorkerCommand command = self->worker_start_command_;
+ switch (command) {
+ case kWorkerWait: // spurious wakeup:
+ goto RESUME_WAIT; // lock still held, avoid incrementing ready.
+ case kWorkerExit:
+ return; // exits thread
+ default:
+ break;
+ }
+
+ lock.unlock();
+ // Command is the maximum number of threads that should run the task.
+ HWY_ASSERT(command < self->NumThreads());
+ if (thread < command) {
+ self->task_(self->data_, thread);
+ }
+ }
+ }
+
+ const size_t num_threads_;
+
+ // Unmodified after ctor, but cannot be const because we call thread::join().
+ std::vector<std::thread> threads_;
+
+ std::mutex mutex_; // guards both cv and their variables.
+ std::condition_variable workers_ready_cv_;
+ size_t workers_ready_ = 0;
+ std::condition_variable worker_start_cv_;
+ WorkerCommand worker_start_command_;
+
+ // Written by main thread, read by workers (after mutex lock/unlock).
+ std::function<void(const void*, size_t)> task_; // points to CallClosure
+ const void* data_; // points to caller's Func
+};
+
+template <class Traits>
+void RunWithoutVerify(Traits st, const Dist dist, const size_t num_keys,
+ const Algo algo, SharedState& shared, size_t thread) {
+ using LaneType = typename Traits::LaneType;
+ using KeyType = typename Traits::KeyType;
+ using Order = typename Traits::Order;
+ const size_t num_lanes = num_keys * st.LanesPerKey();
+ auto aligned = hwy::AllocateAligned<LaneType>(num_lanes);
+
+ (void)GenerateInput(dist, aligned.get(), num_lanes);
+
+ const Timestamp t0;
+ Run<Order>(algo, reinterpret_cast<KeyType*>(aligned.get()), num_keys, shared,
+ thread);
+ HWY_ASSERT(aligned[0] < aligned[num_lanes - 1]);
+}
+
+void BenchParallel() {
+ // Not interested in benchmark results for other targets on x86
+ if (HWY_ARCH_X86 && (HWY_TARGET != HWY_AVX2 && HWY_TARGET != HWY_AVX3)) {
+ return;
+ }
+
+ ThreadPool pool;
+ const size_t NT = pool.NumThreads();
+
+ detail::SharedTraits<detail::TraitsLane<detail::OrderAscending<int64_t>>> st;
+ using KeyType = typename decltype(st)::KeyType;
+ const size_t num_keys = size_t{100} * 1000 * 1000;
+
+#if HAVE_IPS4O
+ const Algo algo = Algo::kIPS4O;
+#else
+ const Algo algo = Algo::kVQSort;
+#endif
+ const Dist dist = Dist::kUniform32;
+
+ SharedState shared;
+ shared.tls.resize(NT);
+
+ std::vector<Result> results;
+ for (size_t nt = 1; nt < NT; nt += HWY_MAX(1, NT / 16)) {
+ Timestamp t0;
+ // Default capture because MSVC wants algo/dist but clang does not.
+ pool.RunOnThreads(nt, [=, &shared](size_t thread) {
+ RunWithoutVerify(st, dist, num_keys, algo, shared, thread);
+ });
+ const double sec = SecondsSince(t0);
+ results.emplace_back(algo, dist, num_keys, nt, sec, sizeof(KeyType),
+ st.KeyString());
+ results.back().Print();
+ }
+}
+
+} // namespace
+// NOLINTNEXTLINE(google-readability-namespace-comments)
+} // namespace HWY_NAMESPACE
+} // namespace hwy
+HWY_AFTER_NAMESPACE();
+
+#if HWY_ONCE
+
+namespace hwy {
+namespace {
+HWY_BEFORE_TEST(BenchParallel);
+HWY_EXPORT_AND_TEST_P(BenchParallel, BenchParallel);
+} // namespace
+} // namespace hwy
+
+#endif // HWY_ONCE