summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env/fs_posix.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/env/fs_posix.cc
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/env/fs_posix.cc')
-rw-r--r--src/rocksdb/env/fs_posix.cc1294
1 files changed, 1294 insertions, 0 deletions
diff --git a/src/rocksdb/env/fs_posix.cc b/src/rocksdb/env/fs_posix.cc
new file mode 100644
index 000000000..e179a421d
--- /dev/null
+++ b/src/rocksdb/env/fs_posix.cc
@@ -0,0 +1,1294 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors
+
+#if !defined(OS_WIN)
+
+#include <dirent.h>
+#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
+#include <dlfcn.h>
+#endif
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
+#include <sys/statfs.h>
+#include <sys/sysmacros.h>
+#endif
+#include <sys/statvfs.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <time.h>
+
+#include <algorithm>
+// Get nano time includes
+#if defined(OS_LINUX) || defined(OS_FREEBSD)
+#elif defined(__MACH__)
+#include <Availability.h>
+#include <mach/clock.h>
+#include <mach/mach.h>
+#else
+#include <chrono>
+#endif
+#include <deque>
+#include <set>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "env/io_posix.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/thread_status_updater.h"
+#include "port/lang.h"
+#include "port/port.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "test_util/sync_point.h"
+#include "util/coding.h"
+#include "util/compression_context_cache.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/thread_local.h"
+#include "util/threadpool_imp.h"
+
+#if !defined(TMPFS_MAGIC)
+#define TMPFS_MAGIC 0x01021994
+#endif
+#if !defined(XFS_SUPER_MAGIC)
+#define XFS_SUPER_MAGIC 0x58465342
+#endif
+#if !defined(EXT4_SUPER_MAGIC)
+#define EXT4_SUPER_MAGIC 0xEF53
+#endif
+
+extern "C" bool RocksDbIOUringEnable() __attribute__((__weak__));
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+
+inline mode_t GetDBFileMode(bool allow_non_owner_access) {
+ return allow_non_owner_access ? 0644 : 0600;
+}
+
+// list of pathnames that are locked
+// Only used for error message.
+struct LockHoldingInfo {
+ int64_t acquire_time;
+ uint64_t acquiring_thread;
+};
+static std::map<std::string, LockHoldingInfo> locked_files;
+static port::Mutex mutex_locked_files;
+
+static int LockOrUnlock(int fd, bool lock) {
+ errno = 0;
+ struct flock f;
+ memset(&f, 0, sizeof(f));
+ f.l_type = (lock ? F_WRLCK : F_UNLCK);
+ f.l_whence = SEEK_SET;
+ f.l_start = 0;
+ f.l_len = 0; // Lock/unlock entire file
+ int value = fcntl(fd, F_SETLK, &f);
+
+ return value;
+}
+
+class PosixFileLock : public FileLock {
+ public:
+ int fd_ = /*invalid*/ -1;
+ std::string filename;
+
+ void Clear() {
+ fd_ = -1;
+ filename.clear();
+ }
+
+ virtual ~PosixFileLock() override {
+ // Check for destruction without UnlockFile
+ assert(fd_ == -1);
+ }
+};
+
+int cloexec_flags(int flags, const EnvOptions* options) {
+ // If the system supports opening the file with cloexec enabled,
+ // do so, as this avoids a race condition if a db is opened around
+ // the same time that a child process is forked
+#ifdef O_CLOEXEC
+ if (options == nullptr || options->set_fd_cloexec) {
+ flags |= O_CLOEXEC;
+ }
+#else
+ (void)options;
+#endif
+ return flags;
+}
+
+class PosixFileSystem : public FileSystem {
+ public:
+ PosixFileSystem();
+
+ static const char* kClassName() { return "PosixFileSystem"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kDefaultName(); }
+
+ ~PosixFileSystem() override {}
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "posix") {
+ return true;
+ } else {
+ return FileSystem::IsInstanceOf(name);
+ }
+ }
+
+ void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
+ if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
+ fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
+ }
+ }
+
+ IOStatus NewSequentialFile(const std::string& fname,
+ const FileOptions& options,
+ std::unique_ptr<FSSequentialFile>* result,
+ IODebugContext* /*dbg*/) override {
+ result->reset();
+ int fd = -1;
+ int flags = cloexec_flags(O_RDONLY, &options);
+ FILE* file = nullptr;
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef ROCKSDB_LITE
+ return IOStatus::IOError(fname,
+ "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+ TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags);
+#endif
+ }
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ return IOError("While opening a file for sequentially reading", fname,
+ errno);
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ return IOError("While fcntl NoCache", fname, errno);
+ }
+#endif
+ } else {
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ file = fdopen(fd, "r");
+ } while (file == nullptr && errno == EINTR);
+ if (file == nullptr) {
+ close(fd);
+ return IOError("While opening file for sequentially read", fname,
+ errno);
+ }
+ }
+ result->reset(new PosixSequentialFile(
+ fname, file, fd, GetLogicalBlockSizeForReadIfNeeded(options, fname, fd),
+ options));
+ return IOStatus::OK();
+ }
+
+ IOStatus NewRandomAccessFile(const std::string& fname,
+ const FileOptions& options,
+ std::unique_ptr<FSRandomAccessFile>* result,
+ IODebugContext* /*dbg*/) override {
+ result->reset();
+ IOStatus s = IOStatus::OK();
+ int fd;
+ int flags = cloexec_flags(O_RDONLY, &options);
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef ROCKSDB_LITE
+ return IOStatus::IOError(fname,
+ "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+ TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
+#endif
+ }
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ s = IOError("While open a file for random read", fname, errno);
+ return s;
+ }
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_mmap_reads) {
+ // Use of mmap for random reads has been removed because it
+ // kills performance when storage is fast.
+ // Use mmap when virtual address-space is plentiful.
+ uint64_t size;
+ IOOptions opts;
+ s = GetFileSize(fname, opts, &size, nullptr);
+ if (s.ok()) {
+ void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
+ if (base != MAP_FAILED) {
+ result->reset(
+ new PosixMmapReadableFile(fd, fname, base, size, options));
+ } else {
+ s = IOError("while mmap file for read", fname, errno);
+ close(fd);
+ }
+ } else {
+ close(fd);
+ }
+ } else {
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ return IOError("while fcntl NoCache", fname, errno);
+ }
+#endif
+ }
+ result->reset(new PosixRandomAccessFile(
+ fname, fd, GetLogicalBlockSizeForReadIfNeeded(options, fname, fd),
+ options
+#if defined(ROCKSDB_IOURING_PRESENT)
+ ,
+ !IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
+#endif
+ ));
+ }
+ return s;
+ }
+
+ virtual IOStatus OpenWritableFile(const std::string& fname,
+ const FileOptions& options, bool reopen,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* /*dbg*/) {
+ result->reset();
+ IOStatus s;
+ int fd = -1;
+ int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+ // Note: we should avoid O_APPEND here due to ta the following bug:
+ // POSIX requires that opening a file with the O_APPEND flag should
+ // have no affect on the location at which pwrite() writes data.
+ // However, on Linux, if a file is opened with O_APPEND, pwrite()
+ // appends data to the end of the file, regardless of the value of
+ // offset.
+ // More info here: https://linux.die.net/man/2/pwrite
+#ifdef ROCKSDB_LITE
+ return IOStatus::IOError(fname,
+ "Direct I/O not supported in RocksDB lite");
+#endif // ROCKSDB_LITE
+ flags |= O_WRONLY;
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // non-direct I/O
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
+ flags = cloexec_flags(flags, &options);
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+
+ if (fd < 0) {
+ s = IOError("While open a file for appending", fname, errno);
+ return s;
+ }
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_mmap_writes) {
+ MaybeForceDisableMmap(fd);
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError("While fcntl NoCache an opened file for appending", fname,
+ errno);
+ return s;
+ }
+#elif defined(OS_SOLARIS)
+ if (directio(fd, DIRECTIO_ON) == -1) {
+ if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
+ close(fd);
+ s = IOError("While calling directio()", fname, errno);
+ return s;
+ }
+ }
+#endif
+ result->reset(new PosixWritableFile(
+ fname, fd, GetLogicalBlockSizeForWriteIfNeeded(options, fname, fd),
+ options));
+ } else {
+ // disable mmap writes
+ EnvOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(
+ new PosixWritableFile(fname, fd,
+ GetLogicalBlockSizeForWriteIfNeeded(
+ no_mmap_writes_options, fname, fd),
+ no_mmap_writes_options));
+ }
+ return s;
+ }
+
+ IOStatus NewWritableFile(const std::string& fname, const FileOptions& options,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) override {
+ return OpenWritableFile(fname, options, false, result, dbg);
+ }
+
+ IOStatus ReopenWritableFile(const std::string& fname,
+ const FileOptions& options,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) override {
+ return OpenWritableFile(fname, options, true, result, dbg);
+ }
+
+ IOStatus ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ const FileOptions& options,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* /*dbg*/) override {
+ result->reset();
+ IOStatus s;
+ int fd = -1;
+
+ int flags = 0;
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef ROCKSDB_LITE
+ return IOStatus::IOError(fname,
+ "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+ flags |= O_WRONLY;
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // mmap needs O_RDWR mode
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
+ flags = cloexec_flags(flags, &options);
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(old_fname.c_str(), flags,
+ GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ s = IOError("while reopen file for write", fname, errno);
+ return s;
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+ // rename into place
+ if (rename(old_fname.c_str(), fname.c_str()) != 0) {
+ s = IOError("while rename file to " + fname, old_fname, errno);
+ close(fd);
+ return s;
+ }
+
+ if (options.use_mmap_writes) {
+ MaybeForceDisableMmap(fd);
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError("while fcntl NoCache for reopened file for append", fname,
+ errno);
+ return s;
+ }
+#elif defined(OS_SOLARIS)
+ if (directio(fd, DIRECTIO_ON) == -1) {
+ if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
+ close(fd);
+ s = IOError("while calling directio()", fname, errno);
+ return s;
+ }
+ }
+#endif
+ result->reset(new PosixWritableFile(
+ fname, fd, GetLogicalBlockSizeForWriteIfNeeded(options, fname, fd),
+ options));
+ } else {
+ // disable mmap writes
+ FileOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(
+ new PosixWritableFile(fname, fd,
+ GetLogicalBlockSizeForWriteIfNeeded(
+ no_mmap_writes_options, fname, fd),
+ no_mmap_writes_options));
+ }
+ return s;
+ }
+
+ IOStatus NewRandomRWFile(const std::string& fname, const FileOptions& options,
+ std::unique_ptr<FSRandomRWFile>* result,
+ IODebugContext* /*dbg*/) override {
+ int fd = -1;
+ int flags = cloexec_flags(O_RDWR, &options);
+
+ while (fd < 0) {
+ IOSTATS_TIMER_GUARD(open_nanos);
+
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ if (fd < 0) {
+ // Error while opening the file
+ if (errno == EINTR) {
+ continue;
+ }
+ return IOError("While open file for random read/write", fname, errno);
+ }
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+ result->reset(new PosixRandomRWFile(fname, fd, options));
+ return IOStatus::OK();
+ }
+
+ IOStatus NewMemoryMappedFileBuffer(
+ const std::string& fname,
+ std::unique_ptr<MemoryMappedFileBuffer>* result) override {
+ int fd = -1;
+ IOStatus status;
+ int flags = cloexec_flags(O_RDWR, nullptr);
+
+ while (fd < 0) {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, 0644);
+ if (fd < 0) {
+ // Error while opening the file
+ if (errno == EINTR) {
+ continue;
+ }
+ status =
+ IOError("While open file for raw mmap buffer access", fname, errno);
+ break;
+ }
+ }
+ uint64_t size;
+ if (status.ok()) {
+ IOOptions opts;
+ status = GetFileSize(fname, opts, &size, nullptr);
+ }
+ void* base = nullptr;
+ if (status.ok()) {
+ base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0);
+ if (base == MAP_FAILED) {
+ status = IOError("while mmap file for read", fname, errno);
+ }
+ }
+ if (status.ok()) {
+ result->reset(
+ new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
+ }
+ if (fd >= 0) {
+ // don't need to keep it open after mmap has been called
+ close(fd);
+ }
+ return status;
+ }
+
+ IOStatus NewDirectory(const std::string& name, const IOOptions& /*opts*/,
+ std::unique_ptr<FSDirectory>* result,
+ IODebugContext* /*dbg*/) override {
+ result->reset();
+ int fd;
+ int flags = cloexec_flags(0, nullptr);
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(name.c_str(), flags);
+ }
+ if (fd < 0) {
+ return IOError("While open directory", name, errno);
+ } else {
+ result->reset(new PosixDirectory(fd, name));
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ int result = access(fname.c_str(), F_OK);
+
+ if (result == 0) {
+ return IOStatus::OK();
+ }
+
+ int err = errno;
+ switch (err) {
+ case EACCES:
+ case ELOOP:
+ case ENAMETOOLONG:
+ case ENOENT:
+ case ENOTDIR:
+ return IOStatus::NotFound();
+ default:
+ assert(err == EIO || err == ENOMEM);
+ return IOStatus::IOError("Unexpected error(" + std::to_string(err) +
+ ") accessing file `" + fname + "' ");
+ }
+ }
+
+ IOStatus GetChildren(const std::string& dir, const IOOptions& opts,
+ std::vector<std::string>* result,
+ IODebugContext* /*dbg*/) override {
+ result->clear();
+
+ DIR* d = opendir(dir.c_str());
+ if (d == nullptr) {
+ switch (errno) {
+ case EACCES:
+ case ENOENT:
+ case ENOTDIR:
+ return IOStatus::NotFound();
+ default:
+ return IOError("While opendir", dir, errno);
+ }
+ }
+
+ // reset errno before calling readdir()
+ errno = 0;
+ struct dirent* entry;
+
+ while ((entry = readdir(d)) != nullptr) {
+ // filter out '.' and '..' directory entries
+ // which appear only on some platforms
+ const bool ignore =
+ entry->d_type == DT_DIR &&
+ (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0
+#ifndef ASSERT_STATUS_CHECKED
+ // In case of ASSERT_STATUS_CHECKED, GetChildren support older
+ // version of API for debugging purpose.
+ || opts.do_not_recurse
+#endif
+ );
+ if (!ignore) {
+ result->push_back(entry->d_name);
+ }
+ errno = 0; // reset errno if readdir() success
+ }
+
+ // always attempt to close the dir
+ const auto pre_close_errno = errno; // errno may be modified by closedir
+ const int close_result = closedir(d);
+
+ if (pre_close_errno != 0) {
+ // error occurred during readdir
+ return IOError("While readdir", dir, pre_close_errno);
+ }
+
+ if (close_result != 0) {
+ // error occurred during closedir
+ return IOError("While closedir", dir, errno);
+ }
+
+ return IOStatus::OK();
+ }
+
+ IOStatus DeleteFile(const std::string& fname, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ IOStatus result;
+ if (unlink(fname.c_str()) != 0) {
+ result = IOError("while unlink() file", fname, errno);
+ }
+ return result;
+ }
+
+ IOStatus CreateDir(const std::string& name, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (mkdir(name.c_str(), 0755) != 0) {
+ return IOError("While mkdir", name, errno);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus CreateDirIfMissing(const std::string& name,
+ const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (mkdir(name.c_str(), 0755) != 0) {
+ if (errno != EEXIST) {
+ return IOError("While mkdir if missing", name, errno);
+ } else if (!DirExists(name)) { // Check that name is actually a
+ // directory.
+ // Message is taken from mkdir
+ return IOStatus::IOError("`" + name +
+ "' exists but is not a directory");
+ }
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus DeleteDir(const std::string& name, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (rmdir(name.c_str()) != 0) {
+ return IOError("file rmdir", name, errno);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus GetFileSize(const std::string& fname, const IOOptions& /*opts*/,
+ uint64_t* size, IODebugContext* /*dbg*/) override {
+ struct stat sbuf;
+ if (stat(fname.c_str(), &sbuf) != 0) {
+ *size = 0;
+ return IOError("while stat a file for size", fname, errno);
+ } else {
+ *size = sbuf.st_size;
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus GetFileModificationTime(const std::string& fname,
+ const IOOptions& /*opts*/,
+ uint64_t* file_mtime,
+ IODebugContext* /*dbg*/) override {
+ struct stat s;
+ if (stat(fname.c_str(), &s) != 0) {
+ return IOError("while stat a file for modification time", fname, errno);
+ }
+ *file_mtime = static_cast<uint64_t>(s.st_mtime);
+ return IOStatus::OK();
+ }
+
+ IOStatus RenameFile(const std::string& src, const std::string& target,
+ const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (rename(src.c_str(), target.c_str()) != 0) {
+ return IOError("While renaming a file to " + target, src, errno);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus LinkFile(const std::string& src, const std::string& target,
+ const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (link(src.c_str(), target.c_str()) != 0) {
+ if (errno == EXDEV || errno == ENOTSUP) {
+ return IOStatus::NotSupported(errno == EXDEV
+ ? "No cross FS links allowed"
+ : "Links not supported by FS");
+ }
+ return IOError("while link file to " + target, src, errno);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*opts*/,
+ uint64_t* count, IODebugContext* /*dbg*/) override {
+ struct stat s;
+ if (stat(fname.c_str(), &s) != 0) {
+ return IOError("while stat a file for num file links", fname, errno);
+ }
+ *count = static_cast<uint64_t>(s.st_nlink);
+ return IOStatus::OK();
+ }
+
+ IOStatus AreFilesSame(const std::string& first, const std::string& second,
+ const IOOptions& /*opts*/, bool* res,
+ IODebugContext* /*dbg*/) override {
+ struct stat statbuf[2];
+ if (stat(first.c_str(), &statbuf[0]) != 0) {
+ return IOError("stat file", first, errno);
+ }
+ if (stat(second.c_str(), &statbuf[1]) != 0) {
+ return IOError("stat file", second, errno);
+ }
+
+ if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) ||
+ minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) ||
+ statbuf[0].st_ino != statbuf[1].st_ino) {
+ *res = false;
+ } else {
+ *res = true;
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus LockFile(const std::string& fname, const IOOptions& /*opts*/,
+ FileLock** lock, IODebugContext* /*dbg*/) override {
+ *lock = nullptr;
+
+ LockHoldingInfo lhi;
+ int64_t current_time = 0;
+ // Ignore status code as the time is only used for error message.
+ SystemClock::Default()
+ ->GetCurrentTime(&current_time)
+ .PermitUncheckedError();
+ lhi.acquire_time = current_time;
+ lhi.acquiring_thread = Env::Default()->GetThreadID();
+
+ mutex_locked_files.Lock();
+ // If it already exists in the locked_files set, then it is already locked,
+ // and fail this lock attempt. Otherwise, insert it into locked_files.
+ // This check is needed because fcntl() does not detect lock conflict
+ // if the fcntl is issued by the same thread that earlier acquired
+ // this lock.
+ // We must do this check *before* opening the file:
+ // Otherwise, we will open a new file descriptor. Locks are associated with
+ // a process, not a file descriptor and when *any* file descriptor is
+ // closed, all locks the process holds for that *file* are released
+ const auto it_success = locked_files.insert({fname, lhi});
+ if (it_success.second == false) {
+ LockHoldingInfo prev_info = it_success.first->second;
+ mutex_locked_files.Unlock();
+ errno = ENOLCK;
+ // Note that the thread ID printed is the same one as the one in
+ // posix logger, but posix logger prints it hex format.
+ return IOError("lock hold by current process, acquire time " +
+ std::to_string(prev_info.acquire_time) +
+ " acquiring thread " +
+ std::to_string(prev_info.acquiring_thread),
+ fname, errno);
+ }
+
+ IOStatus result = IOStatus::OK();
+ int fd;
+ int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr);
+
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, 0644);
+ }
+ if (fd < 0) {
+ result = IOError("while open a file for lock", fname, errno);
+ } else if (LockOrUnlock(fd, true) == -1) {
+ result = IOError("While lock file", fname, errno);
+ close(fd);
+ } else {
+ SetFD_CLOEXEC(fd, nullptr);
+ PosixFileLock* my_lock = new PosixFileLock;
+ my_lock->fd_ = fd;
+ my_lock->filename = fname;
+ *lock = my_lock;
+ }
+ if (!result.ok()) {
+ // If there is an error in locking, then remove the pathname from
+ // locked_files. (If we got this far, it did not exist in locked_files
+ // before this call.)
+ locked_files.erase(fname);
+ }
+
+ mutex_locked_files.Unlock();
+ return result;
+ }
+
+ IOStatus UnlockFile(FileLock* lock, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
+ IOStatus result;
+ mutex_locked_files.Lock();
+ // If we are unlocking, then verify that we had locked it earlier,
+ // it should already exist in locked_files. Remove it from locked_files.
+ if (locked_files.erase(my_lock->filename) != 1) {
+ errno = ENOLCK;
+ result = IOError("unlock", my_lock->filename, errno);
+ } else if (LockOrUnlock(my_lock->fd_, false) == -1) {
+ result = IOError("unlock", my_lock->filename, errno);
+ }
+ close(my_lock->fd_);
+ my_lock->Clear();
+ delete my_lock;
+ mutex_locked_files.Unlock();
+ return result;
+ }
+
+ IOStatus GetAbsolutePath(const std::string& db_path,
+ const IOOptions& /*opts*/, std::string* output_path,
+ IODebugContext* /*dbg*/) override {
+ if (!db_path.empty() && db_path[0] == '/') {
+ *output_path = db_path;
+ return IOStatus::OK();
+ }
+
+ char the_path[4096];
+ char* ret = getcwd(the_path, 4096);
+ if (ret == nullptr) {
+ return IOStatus::IOError(errnoStr(errno).c_str());
+ }
+
+ *output_path = ret;
+ return IOStatus::OK();
+ }
+
+ IOStatus GetTestDirectory(const IOOptions& /*opts*/, std::string* result,
+ IODebugContext* /*dbg*/) override {
+ const char* env = getenv("TEST_TMPDIR");
+ if (env && env[0] != '\0') {
+ *result = env;
+ } else {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
+ *result = buf;
+ }
+ // Directory may already exist
+ {
+ IOOptions opts;
+ return CreateDirIfMissing(*result, opts, nullptr);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus GetFreeSpace(const std::string& fname, const IOOptions& /*opts*/,
+ uint64_t* free_space,
+ IODebugContext* /*dbg*/) override {
+ struct statvfs sbuf;
+
+ if (statvfs(fname.c_str(), &sbuf) < 0) {
+ return IOError("While doing statvfs", fname, errno);
+ }
+
+ // sbuf.bfree is total free space available to root
+ // sbuf.bavail is total free space available to unprivileged user
+ // sbuf.bavail <= sbuf.bfree ... pick correct based upon effective user id
+ if (geteuid()) {
+ // non-zero user is unprivileged, or -1 if error. take more conservative
+ // size
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bavail);
+ } else {
+ // root user can access all disk space
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus IsDirectory(const std::string& path, const IOOptions& /*opts*/,
+ bool* is_dir, IODebugContext* /*dbg*/) override {
+ // First open
+ int fd = -1;
+ int flags = cloexec_flags(O_RDONLY, nullptr);
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(path.c_str(), flags);
+ }
+ if (fd < 0) {
+ return IOError("While open for IsDirectory()", path, errno);
+ }
+ IOStatus io_s;
+ struct stat sbuf;
+ if (fstat(fd, &sbuf) < 0) {
+ io_s = IOError("While doing stat for IsDirectory()", path, errno);
+ }
+ close(fd);
+ if (io_s.ok() && nullptr != is_dir) {
+ *is_dir = S_ISDIR(sbuf.st_mode);
+ }
+ return io_s;
+ }
+
+ FileOptions OptimizeForLogWrite(const FileOptions& file_options,
+ const DBOptions& db_options) const override {
+ FileOptions optimized = file_options;
+ optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
+ optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
+ // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
+ // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
+ // test and make this false
+ optimized.fallocate_with_keep_size = true;
+ optimized.writable_file_max_buffer_size =
+ db_options.writable_file_max_buffer_size;
+ return optimized;
+ }
+
+ FileOptions OptimizeForManifestWrite(
+ const FileOptions& file_options) const override {
+ FileOptions optimized = file_options;
+ optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
+ optimized.fallocate_with_keep_size = true;
+ return optimized;
+ }
+#ifdef OS_LINUX
+ Status RegisterDbPaths(const std::vector<std::string>& paths) override {
+ return logical_block_size_cache_.RefAndCacheLogicalBlockSize(paths);
+ }
+ Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
+ logical_block_size_cache_.UnrefAndTryRemoveCachedLogicalBlockSize(paths);
+ return Status::OK();
+ }
+#endif
+ private:
+ bool forceMmapOff_ = false; // do we override Env options?
+
+ // Returns true iff the named directory exists and is a directory.
+ virtual bool DirExists(const std::string& dname) {
+ struct stat statbuf;
+ if (stat(dname.c_str(), &statbuf) == 0) {
+ return S_ISDIR(statbuf.st_mode);
+ }
+ return false; // stat() failed return false
+ }
+
+ bool SupportsFastAllocate(int fd) {
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ struct statfs s;
+ if (fstatfs(fd, &s)) {
+ return false;
+ }
+ switch (s.f_type) {
+ case EXT4_SUPER_MAGIC:
+ return true;
+ case XFS_SUPER_MAGIC:
+ return true;
+ case TMPFS_MAGIC:
+ return true;
+ default:
+ return false;
+ }
+#else
+ (void)fd;
+ return false;
+#endif
+ }
+
+ void MaybeForceDisableMmap(int fd) {
+ static std::once_flag s_check_disk_for_mmap_once;
+ assert(this == FileSystem::Default().get());
+ std::call_once(
+ s_check_disk_for_mmap_once,
+ [this](int fdesc) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fdesc)) {
+ forceMmapOff_ = true;
+ }
+ },
+ fd);
+ }
+
+#ifdef ROCKSDB_IOURING_PRESENT
+ bool IsIOUringEnabled() {
+ if (RocksDbIOUringEnable && RocksDbIOUringEnable()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+#endif // ROCKSDB_IOURING_PRESENT
+
+ // EXPERIMENTAL
+ //
+ // TODO akankshamahajan:
+ // 1. Update Poll API to take into account min_completions
+ // and returns if number of handles in io_handles (any order) completed is
+ // equal to atleast min_completions.
+ // 2. Currently in case of direct_io, Read API is called because of which call
+ // to Poll API fails as it expects IOHandle to be populated.
+ virtual IOStatus Poll(std::vector<void*>& io_handles,
+ size_t /*min_completions*/) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // io_uring_queue_init.
+ struct io_uring* iu = nullptr;
+ if (thread_local_io_urings_) {
+ iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ }
+
+ // Init failed, platform doesn't support io_uring.
+ if (iu == nullptr) {
+ return IOStatus::NotSupported("Poll");
+ }
+
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ // The request has been completed in earlier runs.
+ if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+ continue;
+ }
+ // Loop until IO for io_handles[i] is completed.
+ while (true) {
+ // io_uring_wait_cqe.
+ struct io_uring_cqe* cqe = nullptr;
+ ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+ if (ret) {
+ // abort as it shouldn't be in indeterminate state and there is no
+ // good way currently to handle this error.
+ abort();
+ }
+
+ // Step 3: Populate the request.
+ assert(cqe != nullptr);
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+ // Reset cqe data to catch any stray reuse of it
+ static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+
+ FSReadRequest req;
+ req.scratch = posix_handle->scratch;
+ req.offset = posix_handle->offset;
+ req.len = posix_handle->len;
+
+ size_t finished_len = 0;
+ size_t bytes_read = 0;
+ bool read_again = false;
+ UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
+ true /*async_read*/, posix_handle->use_direct_io,
+ posix_handle->alignment, finished_len, &req, bytes_read,
+ read_again);
+ posix_handle->is_finished = true;
+ io_uring_cqe_seen(iu, cqe);
+ posix_handle->cb(req, posix_handle->cb_arg);
+
+ (void)finished_len;
+ (void)bytes_read;
+ (void)read_again;
+
+ if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+ break;
+ }
+ }
+ }
+ return IOStatus::OK();
+#else
+ (void)io_handles;
+ return IOStatus::NotSupported("Poll");
+#endif
+ }
+
+ virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // io_uring_queue_init.
+ struct io_uring* iu = nullptr;
+ if (thread_local_io_urings_) {
+ iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ }
+
+ // Init failed, platform doesn't support io_uring.
+ // If Poll is not supported then it didn't submit any request and it should
+ // return OK.
+ if (iu == nullptr) {
+ return IOStatus::OK();
+ }
+
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_handles[i]);
+ if (posix_handle->is_finished == true) {
+ continue;
+ }
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+
+ // Prepare the cancel request.
+ struct io_uring_sqe* sqe;
+ sqe = io_uring_get_sqe(iu);
+
+ // In order to cancel the request, sqe->addr of cancel request should
+ // match with the read request submitted which is posix_handle->iov.
+ io_uring_prep_cancel(sqe, &posix_handle->iov, 0);
+ // Sets sqe->user_data to posix_handle.
+ io_uring_sqe_set_data(sqe, posix_handle);
+
+ // submit the request.
+ ssize_t ret = io_uring_submit(iu);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
+ return IOStatus::IOError("io_uring_submit() requested but returned " +
+ std::to_string(ret));
+ }
+ }
+
+ // After submitting the requests, wait for the requests.
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+ continue;
+ }
+
+ while (true) {
+ struct io_uring_cqe* cqe = nullptr;
+ ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+ if (ret) {
+ // abort as it shouldn't be in indeterminate state and there is no
+ // good way currently to handle this error.
+ abort();
+ }
+ assert(cqe != nullptr);
+
+ // Returns cqe->user_data.
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+ posix_handle->req_count++;
+
+ // Reset cqe data to catch any stray reuse of it
+ static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+ io_uring_cqe_seen(iu, cqe);
+
+ // - If the request is cancelled successfully, the original request is
+ // completed with -ECANCELED and the cancel request is completed with
+ // a result of 0.
+ // - If the request was already running, the original may or
+ // may not complete in error. The cancel request will complete with
+ // -EALREADY for that case.
+ // - And finally, if the request to cancel wasn't
+ // found, the cancel request is completed with -ENOENT.
+ //
+ // Every handle has to wait for 2 requests completion: original one and
+ // the cancel request which is tracked by PosixHandle::req_count.
+ if (posix_handle->req_count == 2 &&
+ static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+ posix_handle->is_finished = true;
+ FSReadRequest req;
+ req.status = IOStatus::Aborted();
+ posix_handle->cb(req, posix_handle->cb_arg);
+
+ break;
+ }
+ }
+ }
+ return IOStatus::OK();
+#else
+ // If Poll is not supported then it didn't submit any request and it should
+ // return OK.
+ (void)io_handles;
+ return IOStatus::OK();
+#endif
+ }
+
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // io_uring instance
+ std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
+#endif
+
+ size_t page_size_;
+
+ // If true, allow non owner read access for db files. Otherwise, non-owner
+ // has no access to db files.
+ bool allow_non_owner_access_;
+
+#ifdef OS_LINUX
+ static LogicalBlockSizeCache logical_block_size_cache_;
+#endif
+ static size_t GetLogicalBlockSize(const std::string& fname, int fd);
+ // In non-direct IO mode, this directly returns kDefaultPageSize.
+ // Otherwise call GetLogicalBlockSize.
+ static size_t GetLogicalBlockSizeForReadIfNeeded(const EnvOptions& options,
+ const std::string& fname,
+ int fd);
+ static size_t GetLogicalBlockSizeForWriteIfNeeded(const EnvOptions& options,
+ const std::string& fname,
+ int fd);
+};
+
+#ifdef OS_LINUX
+LogicalBlockSizeCache PosixFileSystem::logical_block_size_cache_;
+#endif
+
+size_t PosixFileSystem::GetLogicalBlockSize(const std::string& fname, int fd) {
+#ifdef OS_LINUX
+ return logical_block_size_cache_.GetLogicalBlockSize(fname, fd);
+#else
+ (void)fname;
+ return PosixHelper::GetLogicalBlockSizeOfFd(fd);
+#endif
+}
+
+size_t PosixFileSystem::GetLogicalBlockSizeForReadIfNeeded(
+ const EnvOptions& options, const std::string& fname, int fd) {
+ return options.use_direct_reads
+ ? PosixFileSystem::GetLogicalBlockSize(fname, fd)
+ : kDefaultPageSize;
+}
+
+size_t PosixFileSystem::GetLogicalBlockSizeForWriteIfNeeded(
+ const EnvOptions& options, const std::string& fname, int fd) {
+ return options.use_direct_writes
+ ? PosixFileSystem::GetLogicalBlockSize(fname, fd)
+ : kDefaultPageSize;
+}
+
+PosixFileSystem::PosixFileSystem()
+ : forceMmapOff_(false),
+ page_size_(getpagesize()),
+ allow_non_owner_access_(true) {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // Test whether IOUring is supported, and if it does, create a managing
+ // object for thread local point so that in the future thread-local
+ // io_uring can be created.
+ struct io_uring* new_io_uring = CreateIOUring();
+ if (new_io_uring != nullptr) {
+ thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
+ delete new_io_uring;
+ }
+#endif
+}
+
+} // namespace
+
+//
+// Default Posix FileSystem
+//
+std::shared_ptr<FileSystem> FileSystem::Default() {
+ STATIC_AVOID_DESTRUCTION(std::shared_ptr<FileSystem>, instance)
+ (std::make_shared<PosixFileSystem>());
+ return instance;
+}
+
+#ifndef ROCKSDB_LITE
+static FactoryFunc<FileSystem> posix_filesystem_reg =
+ ObjectLibrary::Default()->AddFactory<FileSystem>(
+ ObjectLibrary::PatternEntry("posix").AddSeparator("://", false),
+ [](const std::string& /* uri */, std::unique_ptr<FileSystem>* f,
+ std::string* /* errmsg */) {
+ f->reset(new PosixFileSystem());
+ return f->get();
+ });
+#endif
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif