summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env/env_posix.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/env/env_posix.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/env/env_posix.cc')
-rw-r--r--src/rocksdb/env/env_posix.cc527
1 files changed, 527 insertions, 0 deletions
diff --git a/src/rocksdb/env/env_posix.cc b/src/rocksdb/env/env_posix.cc
new file mode 100644
index 000000000..861fbcf62
--- /dev/null
+++ b/src/rocksdb/env/env_posix.cc
@@ -0,0 +1,527 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors
+#include <dirent.h>
+#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
+#include <dlfcn.h>
+#endif
+#include <errno.h>
+#include <fcntl.h>
+
+#if defined(OS_LINUX)
+#include <linux/fs.h>
+#endif
+#if defined(ROCKSDB_IOURING_PRESENT)
+#include <liburing.h>
+#endif
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
+#include <sys/statfs.h>
+#include <sys/syscall.h>
+#include <sys/sysmacros.h>
+#endif
+#include <sys/statvfs.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#if defined(ROCKSDB_IOURING_PRESENT)
+#include <sys/uio.h>
+#endif
+#include <time.h>
+#include <algorithm>
+// Get nano time includes
+#if defined(OS_LINUX) || defined(OS_FREEBSD)
+#elif defined(__MACH__)
+#include <Availability.h>
+#include <mach/clock.h>
+#include <mach/mach.h>
+#else
+#include <chrono>
+#endif
+#include <deque>
+#include <set>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "env/io_posix.h"
+#include "logging/logging.h"
+#include "logging/posix_logger.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/thread_status_updater.h"
+#include "port/port.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "test_util/sync_point.h"
+#include "util/coding.h"
+#include "util/compression_context_cache.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/thread_local.h"
+#include "util/threadpool_imp.h"
+
+#if !defined(TMPFS_MAGIC)
+#define TMPFS_MAGIC 0x01021994
+#endif
+#if !defined(XFS_SUPER_MAGIC)
+#define XFS_SUPER_MAGIC 0x58465342
+#endif
+#if !defined(EXT4_SUPER_MAGIC)
+#define EXT4_SUPER_MAGIC 0xEF53
+#endif
+
+namespace ROCKSDB_NAMESPACE {
+#if defined(OS_WIN)
+static const std::string kSharedLibExt = ".dll";
+static const char kPathSeparator = ';';
+#else
+static const char kPathSeparator = ':';
+#if defined(OS_MACOSX)
+static const std::string kSharedLibExt = ".dylib";
+#else
+static const std::string kSharedLibExt = ".so";
+#endif
+#endif
+
+namespace {
+
+ThreadStatusUpdater* CreateThreadStatusUpdater() {
+ return new ThreadStatusUpdater();
+}
+
+#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
+class PosixDynamicLibrary : public DynamicLibrary {
+ public:
+ PosixDynamicLibrary(const std::string& name, void* handle)
+ : name_(name), handle_(handle) {}
+ ~PosixDynamicLibrary() override { dlclose(handle_); }
+
+ Status LoadSymbol(const std::string& sym_name, void** func) override {
+ assert(nullptr != func);
+ dlerror(); // Clear any old error
+ *func = dlsym(handle_, sym_name.c_str());
+ if (*func != nullptr) {
+ return Status::OK();
+ } else {
+ char* err = dlerror();
+ return Status::NotFound("Error finding symbol: " + sym_name, err);
+ }
+ }
+
+ const char* Name() const override { return name_.c_str(); }
+
+ private:
+ std::string name_;
+ void* handle_;
+};
+#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
+
+class PosixEnv : public CompositeEnvWrapper {
+ public:
+ PosixEnv();
+
+ ~PosixEnv() override {
+ for (const auto tid : threads_to_join_) {
+ pthread_join(tid, nullptr);
+ }
+ for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
+ thread_pools_[pool_id].JoinAllThreads();
+ }
+ // Delete the thread_status_updater_ only when the current Env is not
+ // Env::Default(). This is to avoid the free-after-use error when
+ // Env::Default() is destructed while some other child threads are
+ // still trying to update thread status.
+ if (this != Env::Default()) {
+ delete thread_status_updater_;
+ }
+ }
+
+ void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
+ if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
+ fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
+ }
+ }
+
+#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
+ // Loads the named library into the result.
+ // If the input name is empty, the current executable is loaded
+ // On *nix systems, a "lib" prefix is added to the name if one is not supplied
+ // Comparably, the appropriate shared library extension is added to the name
+ // if not supplied. If search_path is not specified, the shared library will
+ // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
+ // specified, the shared library will be searched for in the directories
+ // provided by the search path
+ Status LoadLibrary(const std::string& name, const std::string& path,
+ std::shared_ptr<DynamicLibrary>* result) override {
+ Status status;
+ assert(result != nullptr);
+ if (name.empty()) {
+ void* hndl = dlopen(NULL, RTLD_NOW);
+ if (hndl != nullptr) {
+ result->reset(new PosixDynamicLibrary(name, hndl));
+ return Status::OK();
+ }
+ } else {
+ std::string library_name = name;
+ if (library_name.find(kSharedLibExt) == std::string::npos) {
+ library_name = library_name + kSharedLibExt;
+ }
+#if !defined(OS_WIN)
+ if (library_name.find('/') == std::string::npos &&
+ library_name.compare(0, 3, "lib") != 0) {
+ library_name = "lib" + library_name;
+ }
+#endif
+ if (path.empty()) {
+ void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
+ if (hndl != nullptr) {
+ result->reset(new PosixDynamicLibrary(library_name, hndl));
+ return Status::OK();
+ }
+ } else {
+ std::string local_path;
+ std::stringstream ss(path);
+ while (getline(ss, local_path, kPathSeparator)) {
+ if (!path.empty()) {
+ std::string full_name = local_path + "/" + library_name;
+ void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
+ if (hndl != nullptr) {
+ result->reset(new PosixDynamicLibrary(full_name, hndl));
+ return Status::OK();
+ }
+ }
+ }
+ }
+ }
+ return Status::IOError(
+ IOErrorMsg("Failed to open shared library: xs", name), dlerror());
+ }
+#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
+
+ void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
+ void* tag = nullptr,
+ void (*unschedFunction)(void* arg) = nullptr) override;
+
+ int UnSchedule(void* arg, Priority pri) override;
+
+ void StartThread(void (*function)(void* arg), void* arg) override;
+
+ void WaitForJoin() override;
+
+ unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
+
+ Status GetTestDirectory(std::string* result) override {
+ const char* env = getenv("TEST_TMPDIR");
+ if (env && env[0] != '\0') {
+ *result = env;
+ } else {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
+ *result = buf;
+ }
+ // Directory may already exist
+ CreateDir(*result);
+ return Status::OK();
+ }
+
+ Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
+ assert(thread_status_updater_);
+ return thread_status_updater_->GetThreadList(thread_list);
+ }
+
+ static uint64_t gettid(pthread_t tid) {
+ uint64_t thread_id = 0;
+ memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
+ return thread_id;
+ }
+
+ static uint64_t gettid() {
+ pthread_t tid = pthread_self();
+ return gettid(tid);
+ }
+
+ uint64_t GetThreadID() const override { return gettid(pthread_self()); }
+
+ Status NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) override {
+ FILE* f;
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ f = fopen(fname.c_str(),
+ "w"
+#ifdef __GLIBC_PREREQ
+#if __GLIBC_PREREQ(2, 7)
+ "e" // glibc extension to enable O_CLOEXEC
+#endif
+#endif
+ );
+ }
+ if (f == nullptr) {
+ result->reset();
+ return IOError("when fopen a file for new logger", fname, errno);
+ } else {
+ int fd = fileno(f);
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
+#endif
+ SetFD_CLOEXEC(fd, nullptr);
+ result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
+ return Status::OK();
+ }
+ }
+
+ uint64_t NowMicros() override {
+ struct timeval tv;
+ gettimeofday(&tv, nullptr);
+ return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
+ }
+
+ uint64_t NowNanos() override {
+#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#elif defined(OS_SOLARIS)
+ return gethrtime();
+#elif defined(__MACH__)
+ clock_serv_t cclock;
+ mach_timespec_t ts;
+ host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+ clock_get_time(cclock, &ts);
+ mach_port_deallocate(mach_task_self(), cclock);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#else
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+#endif
+ }
+
+ uint64_t NowCPUNanos() override {
+#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
+ (defined(__MACH__) && defined(__MAC_10_12))
+ struct timespec ts;
+ clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#endif
+ return 0;
+ }
+
+ void SleepForMicroseconds(int micros) override { usleep(micros); }
+
+ Status GetHostName(char* name, uint64_t len) override {
+ int ret = gethostname(name, static_cast<size_t>(len));
+ if (ret < 0) {
+ if (errno == EFAULT || errno == EINVAL) {
+ return Status::InvalidArgument(strerror(errno));
+ } else {
+ return IOError("GetHostName", name, errno);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status GetCurrentTime(int64_t* unix_time) override {
+ time_t ret = time(nullptr);
+ if (ret == (time_t) -1) {
+ return IOError("GetCurrentTime", "", errno);
+ }
+ *unix_time = (int64_t) ret;
+ return Status::OK();
+ }
+
+ ThreadStatusUpdater* GetThreadStatusUpdater() const override {
+ return Env::GetThreadStatusUpdater();
+ }
+
+ std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
+
+ // Allow increasing the number of worker threads.
+ void SetBackgroundThreads(int num, Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].SetBackgroundThreads(num);
+ }
+
+ int GetBackgroundThreads(Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ return thread_pools_[pri].GetBackgroundThreads();
+ }
+
+ Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
+ allow_non_owner_access_ = allow_non_owner_access;
+ return Status::OK();
+ }
+
+ // Allow increasing the number of worker threads.
+ void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
+ }
+
+ void LowerThreadPoolIOPriority(Priority pool = LOW) override {
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
+#ifdef OS_LINUX
+ thread_pools_[pool].LowerIOPriority();
+#else
+ (void)pool;
+#endif
+ }
+
+ void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
+#ifdef OS_LINUX
+ thread_pools_[pool].LowerCPUPriority();
+#else
+ (void)pool;
+#endif
+ }
+
+ std::string TimeToString(uint64_t secondsSince1970) override {
+ const time_t seconds = (time_t)secondsSince1970;
+ struct tm t;
+ int maxsize = 64;
+ std::string dummy;
+ dummy.reserve(maxsize);
+ dummy.resize(maxsize);
+ char* p = &dummy[0];
+ localtime_r(&seconds, &t);
+ snprintf(p, maxsize,
+ "%04d/%02d/%02d-%02d:%02d:%02d ",
+ t.tm_year + 1900,
+ t.tm_mon + 1,
+ t.tm_mday,
+ t.tm_hour,
+ t.tm_min,
+ t.tm_sec);
+ return dummy;
+ }
+
+ private:
+ std::vector<ThreadPoolImpl> thread_pools_;
+ pthread_mutex_t mu_;
+ std::vector<pthread_t> threads_to_join_;
+ // If true, allow non owner read access for db files. Otherwise, non-owner
+ // has no access to db files.
+ bool allow_non_owner_access_;
+};
+
+PosixEnv::PosixEnv()
+ : CompositeEnvWrapper(this, FileSystem::Default().get()),
+ thread_pools_(Priority::TOTAL),
+ allow_non_owner_access_(true) {
+ ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
+ for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
+ thread_pools_[pool_id].SetThreadPriority(
+ static_cast<Env::Priority>(pool_id));
+ // This allows later initializing the thread-local-env of each thread.
+ thread_pools_[pool_id].SetHostEnv(this);
+ }
+ thread_status_updater_ = CreateThreadStatusUpdater();
+}
+
+void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
+ void* tag, void (*unschedFunction)(void* arg)) {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
+}
+
+int PosixEnv::UnSchedule(void* arg, Priority pri) {
+ return thread_pools_[pri].UnSchedule(arg);
+}
+
+unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ return thread_pools_[pri].GetQueueLen();
+}
+
+struct StartThreadState {
+ void (*user_function)(void*);
+ void* arg;
+};
+
+static void* StartThreadWrapper(void* arg) {
+ StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
+ state->user_function(state->arg);
+ delete state;
+ return nullptr;
+}
+
+void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
+ pthread_t t;
+ StartThreadState* state = new StartThreadState;
+ state->user_function = function;
+ state->arg = arg;
+ ThreadPoolImpl::PthreadCall(
+ "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
+ ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
+ threads_to_join_.push_back(t);
+ ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+}
+
+void PosixEnv::WaitForJoin() {
+ for (const auto tid : threads_to_join_) {
+ pthread_join(tid, nullptr);
+ }
+ threads_to_join_.clear();
+}
+
+} // namespace
+
+std::string Env::GenerateUniqueId() {
+ std::string uuid_file = "/proc/sys/kernel/random/uuid";
+
+ Status s = FileExists(uuid_file);
+ if (s.ok()) {
+ std::string uuid;
+ s = ReadFileToString(this, uuid_file, &uuid);
+ if (s.ok()) {
+ return uuid;
+ }
+ }
+ // Could not read uuid_file - generate uuid using "nanos-random"
+ Random64 r(time(nullptr));
+ uint64_t random_uuid_portion =
+ r.Uniform(std::numeric_limits<uint64_t>::max());
+ uint64_t nanos_uuid_portion = NowNanos();
+ char uuid2[200];
+ snprintf(uuid2,
+ 200,
+ "%lx-%lx",
+ (unsigned long)nanos_uuid_portion,
+ (unsigned long)random_uuid_portion);
+ return uuid2;
+}
+
+//
+// Default Posix Env
+//
+Env* Env::Default() {
+ // The following function call initializes the singletons of ThreadLocalPtr
+ // right before the static default_env. This guarantees default_env will
+ // always being destructed before the ThreadLocalPtr singletons get
+ // destructed as C++ guarantees that the destructions of static variables
+ // is in the reverse order of their constructions.
+ //
+ // Since static members are destructed in the reverse order
+ // of their construction, having this call here guarantees that
+ // the destructor of static PosixEnv will go first, then the
+ // the singletons of ThreadLocalPtr.
+ ThreadLocalPtr::InitSingletons();
+ CompressionContextCache::InitSingleton();
+ INIT_SYNC_POINT_SINGLETONS();
+ static PosixEnv default_env;
+ static CompositeEnvWrapper composite_env(&default_env,
+ FileSystem::Default().get());
+ return &composite_env;
+}
+
+} // namespace ROCKSDB_NAMESPACE