diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/env/env_posix.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/env/env_posix.cc')
-rw-r--r-- | src/rocksdb/env/env_posix.cc | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/src/rocksdb/env/env_posix.cc b/src/rocksdb/env/env_posix.cc new file mode 100644 index 000000000..861fbcf62 --- /dev/null +++ b/src/rocksdb/env/env_posix.cc @@ -0,0 +1,527 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors +#include <dirent.h> +#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION +#include <dlfcn.h> +#endif +#include <errno.h> +#include <fcntl.h> + +#if defined(OS_LINUX) +#include <linux/fs.h> +#endif +#if defined(ROCKSDB_IOURING_PRESENT) +#include <liburing.h> +#endif +#include <pthread.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID) +#include <sys/statfs.h> +#include <sys/syscall.h> +#include <sys/sysmacros.h> +#endif +#include <sys/statvfs.h> +#include <sys/time.h> +#include <sys/types.h> +#if defined(ROCKSDB_IOURING_PRESENT) +#include <sys/uio.h> +#endif +#include <time.h> +#include <algorithm> +// Get nano time includes +#if defined(OS_LINUX) || defined(OS_FREEBSD) +#elif defined(__MACH__) +#include <Availability.h> +#include <mach/clock.h> +#include <mach/mach.h> +#else +#include <chrono> +#endif +#include <deque> +#include <set> +#include <vector> + +#include "env/composite_env_wrapper.h" +#include "env/io_posix.h" +#include "logging/logging.h" +#include "logging/posix_logger.h" +#include "monitoring/iostats_context_imp.h" +#include "monitoring/thread_status_updater.h" +#include "port/port.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "test_util/sync_point.h" +#include "util/coding.h" +#include "util/compression_context_cache.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/thread_local.h" +#include "util/threadpool_imp.h" + +#if !defined(TMPFS_MAGIC) +#define TMPFS_MAGIC 0x01021994 +#endif +#if !defined(XFS_SUPER_MAGIC) +#define XFS_SUPER_MAGIC 0x58465342 +#endif +#if !defined(EXT4_SUPER_MAGIC) +#define EXT4_SUPER_MAGIC 0xEF53 +#endif + +namespace ROCKSDB_NAMESPACE { +#if defined(OS_WIN) +static const std::string kSharedLibExt = ".dll"; +static const char kPathSeparator = ';'; +#else +static const char kPathSeparator = ':'; +#if defined(OS_MACOSX) +static const std::string kSharedLibExt = ".dylib"; +#else +static const std::string kSharedLibExt = ".so"; +#endif +#endif + +namespace { + +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + +#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION +class PosixDynamicLibrary : public DynamicLibrary { + public: + PosixDynamicLibrary(const std::string& name, void* handle) + : name_(name), handle_(handle) {} + ~PosixDynamicLibrary() override { dlclose(handle_); } + + Status LoadSymbol(const std::string& sym_name, void** func) override { + assert(nullptr != func); + dlerror(); // Clear any old error + *func = dlsym(handle_, sym_name.c_str()); + if (*func != nullptr) { + return Status::OK(); + } else { + char* err = dlerror(); + return Status::NotFound("Error finding symbol: " + sym_name, err); + } + } + + const char* Name() const override { return name_.c_str(); } + + private: + std::string name_; + void* handle_; +}; +#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION + +class PosixEnv : public CompositeEnvWrapper { + public: + PosixEnv(); + + ~PosixEnv() override { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } + // Delete the thread_status_updater_ only when the current Env is not + // Env::Default(). This is to avoid the free-after-use error when + // Env::Default() is destructed while some other child threads are + // still trying to update thread status. + if (this != Env::Default()) { + delete thread_status_updater_; + } + } + + void SetFD_CLOEXEC(int fd, const EnvOptions* options) { + if ((options == nullptr || options->set_fd_cloexec) && fd > 0) { + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); + } + } + +#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION + // Loads the named library into the result. + // If the input name is empty, the current executable is loaded + // On *nix systems, a "lib" prefix is added to the name if one is not supplied + // Comparably, the appropriate shared library extension is added to the name + // if not supplied. If search_path is not specified, the shared library will + // be loaded using the default path (LD_LIBRARY_PATH) If search_path is + // specified, the shared library will be searched for in the directories + // provided by the search path + Status LoadLibrary(const std::string& name, const std::string& path, + std::shared_ptr<DynamicLibrary>* result) override { + Status status; + assert(result != nullptr); + if (name.empty()) { + void* hndl = dlopen(NULL, RTLD_NOW); + if (hndl != nullptr) { + result->reset(new PosixDynamicLibrary(name, hndl)); + return Status::OK(); + } + } else { + std::string library_name = name; + if (library_name.find(kSharedLibExt) == std::string::npos) { + library_name = library_name + kSharedLibExt; + } +#if !defined(OS_WIN) + if (library_name.find('/') == std::string::npos && + library_name.compare(0, 3, "lib") != 0) { + library_name = "lib" + library_name; + } +#endif + if (path.empty()) { + void* hndl = dlopen(library_name.c_str(), RTLD_NOW); + if (hndl != nullptr) { + result->reset(new PosixDynamicLibrary(library_name, hndl)); + return Status::OK(); + } + } else { + std::string local_path; + std::stringstream ss(path); + while (getline(ss, local_path, kPathSeparator)) { + if (!path.empty()) { + std::string full_name = local_path + "/" + library_name; + void* hndl = dlopen(full_name.c_str(), RTLD_NOW); + if (hndl != nullptr) { + result->reset(new PosixDynamicLibrary(full_name, hndl)); + return Status::OK(); + } + } + } + } + } + return Status::IOError( + IOErrorMsg("Failed to open shared library: xs", name), dlerror()); + } +#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION + + void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW, + void* tag = nullptr, + void (*unschedFunction)(void* arg) = nullptr) override; + + int UnSchedule(void* arg, Priority pri) override; + + void StartThread(void (*function)(void* arg), void* arg) override; + + void WaitForJoin() override; + + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + + Status GetTestDirectory(std::string* result) override { + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + *result = env; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid())); + *result = buf; + } + // Directory may already exist + CreateDir(*result); + return Status::OK(); + } + + Status GetThreadList(std::vector<ThreadStatus>* thread_list) override { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); + } + + static uint64_t gettid(pthread_t tid) { + uint64_t thread_id = 0; + memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); + return thread_id; + } + + static uint64_t gettid() { + pthread_t tid = pthread_self(); + return gettid(tid); + } + + uint64_t GetThreadID() const override { return gettid(pthread_self()); } + + Status NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) override { + FILE* f; + { + IOSTATS_TIMER_GUARD(open_nanos); + f = fopen(fname.c_str(), + "w" +#ifdef __GLIBC_PREREQ +#if __GLIBC_PREREQ(2, 7) + "e" // glibc extension to enable O_CLOEXEC +#endif +#endif + ); + } + if (f == nullptr) { + result->reset(); + return IOError("when fopen a file for new logger", fname, errno); + } else { + int fd = fileno(f); +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024); +#endif + SetFD_CLOEXEC(fd, nullptr); + result->reset(new PosixLogger(f, &PosixEnv::gettid, this)); + return Status::OK(); + } + } + + uint64_t NowMicros() override { + struct timeval tv; + gettimeofday(&tv, nullptr); + return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; + } + + uint64_t NowNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#elif defined(OS_SOLARIS) + return gethrtime(); +#elif defined(__MACH__) + clock_serv_t cclock; + mach_timespec_t ts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &ts); + mach_port_deallocate(mach_task_self(), cclock); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#else + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()).count(); +#endif + } + + uint64_t NowCPUNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \ + (defined(__MACH__) && defined(__MAC_10_12)) + struct timespec ts; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#endif + return 0; + } + + void SleepForMicroseconds(int micros) override { usleep(micros); } + + Status GetHostName(char* name, uint64_t len) override { + int ret = gethostname(name, static_cast<size_t>(len)); + if (ret < 0) { + if (errno == EFAULT || errno == EINVAL) { + return Status::InvalidArgument(strerror(errno)); + } else { + return IOError("GetHostName", name, errno); + } + } + return Status::OK(); + } + + Status GetCurrentTime(int64_t* unix_time) override { + time_t ret = time(nullptr); + if (ret == (time_t) -1) { + return IOError("GetCurrentTime", "", errno); + } + *unix_time = (int64_t) ret; + return Status::OK(); + } + + ThreadStatusUpdater* GetThreadStatusUpdater() const override { + return Env::GetThreadStatusUpdater(); + } + + std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); } + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); + } + + int GetBackgroundThreads(Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); + } + + Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override { + allow_non_owner_access_ = allow_non_owner_access; + return Status::OK(); + } + + // Allow increasing the number of worker threads. + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); + } + + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerIOPriority(); +#else + (void)pool; +#endif + } + + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerCPUPriority(); +#else + (void)pool; +#endif + } + + std::string TimeToString(uint64_t secondsSince1970) override { + const time_t seconds = (time_t)secondsSince1970; + struct tm t; + int maxsize = 64; + std::string dummy; + dummy.reserve(maxsize); + dummy.resize(maxsize); + char* p = &dummy[0]; + localtime_r(&seconds, &t); + snprintf(p, maxsize, + "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec); + return dummy; + } + + private: + std::vector<ThreadPoolImpl> thread_pools_; + pthread_mutex_t mu_; + std::vector<pthread_t> threads_to_join_; + // If true, allow non owner read access for db files. Otherwise, non-owner + // has no access to db files. + bool allow_non_owner_access_; +}; + +PosixEnv::PosixEnv() + : CompositeEnvWrapper(this, FileSystem::Default().get()), + thread_pools_(Priority::TOTAL), + allow_non_owner_access_(true) { + ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].SetThreadPriority( + static_cast<Env::Priority>(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(this); + } + thread_status_updater_ = CreateThreadStatusUpdater(); +} + +void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, + void* tag, void (*unschedFunction)(void* arg)) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); +} + +int PosixEnv::UnSchedule(void* arg, Priority pri) { + return thread_pools_[pri].UnSchedule(arg); +} + +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + +struct StartThreadState { + void (*user_function)(void*); + void* arg; +}; + +static void* StartThreadWrapper(void* arg) { + StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); + state->user_function(state->arg); + delete state; + return nullptr; +} + +void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { + pthread_t t; + StartThreadState* state = new StartThreadState; + state->user_function = function; + state->arg = arg; + ThreadPoolImpl::PthreadCall( + "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); + threads_to_join_.push_back(t); + ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void PosixEnv::WaitForJoin() { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + threads_to_join_.clear(); +} + +} // namespace + +std::string Env::GenerateUniqueId() { + std::string uuid_file = "/proc/sys/kernel/random/uuid"; + + Status s = FileExists(uuid_file); + if (s.ok()) { + std::string uuid; + s = ReadFileToString(this, uuid_file, &uuid); + if (s.ok()) { + return uuid; + } + } + // Could not read uuid_file - generate uuid using "nanos-random" + Random64 r(time(nullptr)); + uint64_t random_uuid_portion = + r.Uniform(std::numeric_limits<uint64_t>::max()); + uint64_t nanos_uuid_portion = NowNanos(); + char uuid2[200]; + snprintf(uuid2, + 200, + "%lx-%lx", + (unsigned long)nanos_uuid_portion, + (unsigned long)random_uuid_portion); + return uuid2; +} + +// +// Default Posix Env +// +Env* Env::Default() { + // The following function call initializes the singletons of ThreadLocalPtr + // right before the static default_env. This guarantees default_env will + // always being destructed before the ThreadLocalPtr singletons get + // destructed as C++ guarantees that the destructions of static variables + // is in the reverse order of their constructions. + // + // Since static members are destructed in the reverse order + // of their construction, having this call here guarantees that + // the destructor of static PosixEnv will go first, then the + // the singletons of ThreadLocalPtr. + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); + INIT_SYNC_POINT_SINGLETONS(); + static PosixEnv default_env; + static CompositeEnvWrapper composite_env(&default_env, + FileSystem::Default().get()); + return &composite_env; +} + +} // namespace ROCKSDB_NAMESPACE |