summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/concurrent_arena.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/util/concurrent_arena.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/rocksdb/util/concurrent_arena.h215
1 files changed, 215 insertions, 0 deletions
diff --git a/src/rocksdb/util/concurrent_arena.h b/src/rocksdb/util/concurrent_arena.h
new file mode 100644
index 00000000..a6191100
--- /dev/null
+++ b/src/rocksdb/util/concurrent_arena.h
@@ -0,0 +1,215 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <memory>
+#include <utility>
+#include "port/likely.h"
+#include "util/allocator.h"
+#include "util/arena.h"
+#include "util/core_local.h"
+#include "util/mutexlock.h"
+#include "util/thread_local.h"
+
+// Only generate field unused warning for padding array, or build under
+// GCC 4.8.1 will fail.
+#ifdef __clang__
+#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
+#else
+#define ROCKSDB_FIELD_UNUSED
+#endif // __clang__
+
+namespace rocksdb {
+
+class Logger;
+
+// ConcurrentArena wraps an Arena. It makes it thread safe using a fast
+// inlined spinlock, and adds small per-core allocation caches to avoid
+// contention for small allocations. To avoid any memory waste from the
+// per-core shards, they are kept small, they are lazily instantiated
+// only if ConcurrentArena actually notices concurrent use, and they
+// adjust their size so that there is no fragmentation waste when the
+// shard blocks are allocated from the underlying main arena.
+class ConcurrentArena : public Allocator {
+ public:
+ // block_size and huge_page_size are the same as for Arena (and are
+ // in fact just passed to the constructor of arena_. The core-local
+ // shards compute their shard_block_size as a fraction of block_size
+ // that varies according to the hardware concurrency level.
+ explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
+ AllocTracker* tracker = nullptr,
+ size_t huge_page_size = 0);
+
+ char* Allocate(size_t bytes) override {
+ return AllocateImpl(bytes, false /*force_arena*/,
+ [=]() { return arena_.Allocate(bytes); });
+ }
+
+ char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
+ Logger* logger = nullptr) override {
+ size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
+ assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
+ (rounded_up % sizeof(void*)) == 0);
+
+ return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
+ return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
+ });
+ }
+
+ size_t ApproximateMemoryUsage() const {
+ std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
+ lock.lock();
+ return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
+ }
+
+ size_t MemoryAllocatedBytes() const {
+ return memory_allocated_bytes_.load(std::memory_order_relaxed);
+ }
+
+ size_t AllocatedAndUnused() const {
+ return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
+ ShardAllocatedAndUnused();
+ }
+
+ size_t IrregularBlockNum() const {
+ return irregular_block_num_.load(std::memory_order_relaxed);
+ }
+
+ size_t BlockSize() const override { return arena_.BlockSize(); }
+
+ private:
+ struct Shard {
+ char padding[40] ROCKSDB_FIELD_UNUSED;
+ mutable SpinMutex mutex;
+ char* free_begin_;
+ std::atomic<size_t> allocated_and_unused_;
+
+ Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
+ };
+
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+ static __thread size_t tls_cpuid;
+#else
+ enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
+#endif
+
+ char padding0[56] ROCKSDB_FIELD_UNUSED;
+
+ size_t shard_block_size_;
+
+ CoreLocalArray<Shard> shards_;
+
+ Arena arena_;
+ mutable SpinMutex arena_mutex_;
+ std::atomic<size_t> arena_allocated_and_unused_;
+ std::atomic<size_t> memory_allocated_bytes_;
+ std::atomic<size_t> irregular_block_num_;
+
+ char padding1[56] ROCKSDB_FIELD_UNUSED;
+
+ Shard* Repick();
+
+ size_t ShardAllocatedAndUnused() const {
+ size_t total = 0;
+ for (size_t i = 0; i < shards_.Size(); ++i) {
+ total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
+ std::memory_order_relaxed);
+ }
+ return total;
+ }
+
+ template <typename Func>
+ char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
+ size_t cpu;
+
+ // Go directly to the arena if the allocation is too large, or if
+ // we've never needed to Repick() and the arena mutex is available
+ // with no waiting. This keeps the fragmentation penalty of
+ // concurrency zero unless it might actually confer an advantage.
+ std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
+ if (bytes > shard_block_size_ / 4 || force_arena ||
+ ((cpu = tls_cpuid) == 0 &&
+ !shards_.AccessAtCore(0)->allocated_and_unused_.load(
+ std::memory_order_relaxed) &&
+ arena_lock.try_lock())) {
+ if (!arena_lock.owns_lock()) {
+ arena_lock.lock();
+ }
+ auto rv = func();
+ Fixup();
+ return rv;
+ }
+
+ // pick a shard from which to allocate
+ Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
+ if (!s->mutex.try_lock()) {
+ s = Repick();
+ s->mutex.lock();
+ }
+ std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
+
+ size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
+ if (avail < bytes) {
+ // reload
+ std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
+
+ // If the arena's current block is within a factor of 2 of the right
+ // size, we adjust our request to avoid arena waste.
+ auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
+ assert(exact == arena_.AllocatedAndUnused());
+
+ if (exact >= bytes && arena_.IsInInlineBlock()) {
+ // If we haven't exhausted arena's inline block yet, allocate from arena
+ // directly. This ensures that we'll do the first few small allocations
+ // without allocating any blocks.
+ // In particular this prevents empty memtables from using
+ // disproportionately large amount of memory: a memtable allocates on
+ // the order of 1 KB of memory when created; we wouldn't want to
+ // allocate a full arena block (typically a few megabytes) for that,
+ // especially if there are thousands of empty memtables.
+ auto rv = func();
+ Fixup();
+ return rv;
+ }
+
+ avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
+ ? exact
+ : shard_block_size_;
+ s->free_begin_ = arena_.AllocateAligned(avail);
+ Fixup();
+ }
+ s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
+
+ char* rv;
+ if ((bytes % sizeof(void*)) == 0) {
+ // aligned allocation from the beginning
+ rv = s->free_begin_;
+ s->free_begin_ += bytes;
+ } else {
+ // unaligned from the end
+ rv = s->free_begin_ + avail - bytes;
+ }
+ return rv;
+ }
+
+ void Fixup() {
+ arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
+ std::memory_order_relaxed);
+ memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
+ std::memory_order_relaxed);
+ irregular_block_num_.store(arena_.IrregularBlockNum(),
+ std::memory_order_relaxed);
+ }
+
+ ConcurrentArena(const ConcurrentArena&) = delete;
+ ConcurrentArena& operator=(const ConcurrentArena&) = delete;
+};
+
+} // namespace rocksdb