diff options
Diffstat (limited to 'src/rocksdb/env/env_posix.cc')
-rw-r--r-- | src/rocksdb/env/env_posix.cc | 1128 |
1 files changed, 1128 insertions, 0 deletions
diff --git a/src/rocksdb/env/env_posix.cc b/src/rocksdb/env/env_posix.cc new file mode 100644 index 00000000..387c0279 --- /dev/null +++ b/src/rocksdb/env/env_posix.cc @@ -0,0 +1,1128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#if defined(OS_LINUX) +#include <linux/fs.h> +#endif +#include <pthread.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID) +#include <sys/statfs.h> +#include <sys/syscall.h> +#include <sys/sysmacros.h> +#endif +#include <sys/statvfs.h> +#include <sys/time.h> +#include <sys/types.h> +#include <time.h> +#include <algorithm> +// Get nano time includes +#if defined(OS_LINUX) || defined(OS_FREEBSD) +#elif defined(__MACH__) +#include <mach/clock.h> +#include <mach/mach.h> +#else +#include <chrono> +#endif +#include <deque> +#include <set> +#include <vector> + +#include "env/io_posix.h" +#include "env/posix_logger.h" +#include "monitoring/iostats_context_imp.h" +#include "monitoring/thread_status_updater.h" +#include "port/port.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/compression_context_cache.h" +#include "util/logging.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/thread_local.h" +#include "util/threadpool_imp.h" + +#if !defined(TMPFS_MAGIC) +#define TMPFS_MAGIC 0x01021994 +#endif +#if !defined(XFS_SUPER_MAGIC) +#define XFS_SUPER_MAGIC 0x58465342 +#endif +#if !defined(EXT4_SUPER_MAGIC) +#define EXT4_SUPER_MAGIC 0xEF53 +#endif + +namespace rocksdb { + +namespace { + +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + +inline mode_t GetDBFileMode(bool allow_non_owner_access) { + return allow_non_owner_access ? 0644 : 0600; +} + +// list of pathnames that are locked +static std::set<std::string> lockedFiles; +static port::Mutex mutex_lockedFiles; + +static int LockOrUnlock(int fd, bool lock) { + errno = 0; + struct flock f; + memset(&f, 0, sizeof(f)); + f.l_type = (lock ? F_WRLCK : F_UNLCK); + f.l_whence = SEEK_SET; + f.l_start = 0; + f.l_len = 0; // Lock/unlock entire file + int value = fcntl(fd, F_SETLK, &f); + + return value; +} + +class PosixFileLock : public FileLock { + public: + int fd_; + std::string filename; +}; + +int cloexec_flags(int flags, const EnvOptions* options) { + // If the system supports opening the file with cloexec enabled, + // do so, as this avoids a race condition if a db is opened around + // the same time that a child process is forked +#ifdef O_CLOEXEC + if (options == nullptr || options->set_fd_cloexec) { + flags |= O_CLOEXEC; + } +#endif + return flags; +} + +class PosixEnv : public Env { + public: + PosixEnv(); + + ~PosixEnv() override { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } + // Delete the thread_status_updater_ only when the current Env is not + // Env::Default(). This is to avoid the free-after-use error when + // Env::Default() is destructed while some other child threads are + // still trying to update thread status. + if (this != Env::Default()) { + delete thread_status_updater_; + } + } + + void SetFD_CLOEXEC(int fd, const EnvOptions* options) { + if ((options == nullptr || options->set_fd_cloexec) && fd > 0) { + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); + } + } + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) override { + result->reset(); + int fd = -1; + int flags = cloexec_flags(O_RDONLY, &options); + FILE* file = nullptr; + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + } + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + return IOError("While opening a file for sequentially reading", fname, + errno); + } + + SetFD_CLOEXEC(fd, &options); + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + return IOError("While fcntl NoCache", fname, errno); + } +#endif + } else { + do { + IOSTATS_TIMER_GUARD(open_nanos); + file = fdopen(fd, "r"); + } while (file == nullptr && errno == EINTR); + if (file == nullptr) { + close(fd); + return IOError("While opening file for sequentially read", fname, + errno); + } + } + result->reset(new PosixSequentialFile(fname, file, fd, options)); + return Status::OK(); + } + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options) override { + result->reset(); + Status s; + int fd; + int flags = cloexec_flags(O_RDONLY, &options); + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); +#endif + } + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + return IOError("While open a file for random read", fname, errno); + } + SetFD_CLOEXEC(fd, &options); + + if (options.use_mmap_reads && sizeof(void*) >= 8) { + // Use of mmap for random reads has been removed because it + // kills performance when storage is fast. + // Use mmap when virtual address-space is plentiful. + uint64_t size; + s = GetFileSize(fname, &size); + if (s.ok()) { + void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); + if (base != MAP_FAILED) { + result->reset(new PosixMmapReadableFile(fd, fname, base, + size, options)); + } else { + s = IOError("while mmap file for read", fname, errno); + close(fd); + } + } + } else { + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + return IOError("while fcntl NoCache", fname, errno); + } +#endif + } + result->reset(new PosixRandomAccessFile(fname, fd, options)); + } + return s; + } + + virtual Status OpenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options, + bool reopen = false) { + result->reset(); + Status s; + int fd = -1; + int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC); + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { + // Note: we should avoid O_APPEND here due to ta the following bug: + // POSIX requires that opening a file with the O_APPEND flag should + // have no affect on the location at which pwrite() writes data. + // However, on Linux, if a file is opened with O_APPEND, pwrite() + // appends data to the end of the file, regardless of the value of + // offset. + // More info here: https://linux.die.net/man/2/pwrite +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // ROCKSDB_LITE + flags |= O_WRONLY; +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // non-direct I/O + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + + flags = cloexec_flags(flags, &options); + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + + if (fd < 0) { + s = IOError("While open a file for appending", fname, errno); + return s; + } + SetFD_CLOEXEC(fd, &options); + + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; + } + } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError("While fcntl NoCache an opened file for appending", fname, + errno); + return s; + } +#elif defined(OS_SOLARIS) + if (directio(fd, DIRECTIO_ON) == -1) { + if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON + close(fd); + s = IOError("While calling directio()", fname, errno); + return s; + } + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } + return s; + } + + Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override { + return OpenWritableFile(fname, result, options, false); + } + + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override { + return OpenWritableFile(fname, result, options, true); + } + + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override { + result->reset(); + Status s; + int fd = -1; + + int flags = 0; + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE + flags |= O_WRONLY; +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // mmap needs O_RDWR mode + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + + flags = cloexec_flags(flags, &options); + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(old_fname.c_str(), flags, + GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + s = IOError("while reopen file for write", fname, errno); + return s; + } + + SetFD_CLOEXEC(fd, &options); + // rename into place + if (rename(old_fname.c_str(), fname.c_str()) != 0) { + s = IOError("while rename file to " + fname, old_fname, errno); + close(fd); + return s; + } + + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; + } + } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError("while fcntl NoCache for reopened file for append", fname, + errno); + return s; + } +#elif defined(OS_SOLARIS) + if (directio(fd, DIRECTIO_ON) == -1) { + if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON + close(fd); + s = IOError("while calling directio()", fname, errno); + return s; + } + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } + return s; + } + + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& options) override { + int fd = -1; + int flags = cloexec_flags(O_RDWR, &options); + + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + return IOError("While open file for random read/write", fname, errno); + } + } + + SetFD_CLOEXEC(fd, &options); + result->reset(new PosixRandomRWFile(fname, fd, options)); + return Status::OK(); + } + + Status NewMemoryMappedFileBuffer( + const std::string& fname, + std::unique_ptr<MemoryMappedFileBuffer>* result) override { + int fd = -1; + Status status; + int flags = cloexec_flags(O_RDWR, nullptr); + + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + status = + IOError("While open file for raw mmap buffer access", fname, errno); + break; + } + } + uint64_t size; + if (status.ok()) { + status = GetFileSize(fname, &size); + } + void* base = nullptr; + if (status.ok()) { + base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + status = IOError("while mmap file for read", fname, errno); + } + } + if (status.ok()) { + result->reset( + new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size))); + } + if (fd >= 0) { + // don't need to keep it open after mmap has been called + close(fd); + } + return status; + } + + Status NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) override { + result->reset(); + int fd; + int flags = cloexec_flags(0, nullptr); + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(name.c_str(), flags); + } + if (fd < 0) { + return IOError("While open directory", name, errno); + } else { + result->reset(new PosixDirectory(fd)); + } + return Status::OK(); + } + + Status FileExists(const std::string& fname) override { + int result = access(fname.c_str(), F_OK); + + if (result == 0) { + return Status::OK(); + } + + int err = errno; + switch (err) { + case EACCES: + case ELOOP: + case ENAMETOOLONG: + case ENOENT: + case ENOTDIR: + return Status::NotFound(); + default: + assert(err == EIO || err == ENOMEM); + return Status::IOError("Unexpected error(" + ToString(err) + + ") accessing file `" + fname + "' "); + } + } + + Status GetChildren(const std::string& dir, + std::vector<std::string>* result) override { + result->clear(); + DIR* d = opendir(dir.c_str()); + if (d == nullptr) { + switch (errno) { + case EACCES: + case ENOENT: + case ENOTDIR: + return Status::NotFound(); + default: + return IOError("While opendir", dir, errno); + } + } + struct dirent* entry; + while ((entry = readdir(d)) != nullptr) { + result->push_back(entry->d_name); + } + closedir(d); + return Status::OK(); + } + + Status DeleteFile(const std::string& fname) override { + Status result; + if (unlink(fname.c_str()) != 0) { + result = IOError("while unlink() file", fname, errno); + } + return result; + }; + + Status CreateDir(const std::string& name) override { + Status result; + if (mkdir(name.c_str(), 0755) != 0) { + result = IOError("While mkdir", name, errno); + } + return result; + }; + + Status CreateDirIfMissing(const std::string& name) override { + Status result; + if (mkdir(name.c_str(), 0755) != 0) { + if (errno != EEXIST) { + result = IOError("While mkdir if missing", name, errno); + } else if (!DirExists(name)) { // Check that name is actually a + // directory. + // Message is taken from mkdir + result = Status::IOError("`"+name+"' exists but is not a directory"); + } + } + return result; + }; + + Status DeleteDir(const std::string& name) override { + Status result; + if (rmdir(name.c_str()) != 0) { + result = IOError("file rmdir", name, errno); + } + return result; + }; + + Status GetFileSize(const std::string& fname, uint64_t* size) override { + Status s; + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + *size = 0; + s = IOError("while stat a file for size", fname, errno); + } else { + *size = sbuf.st_size; + } + return s; + } + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override { + struct stat s; + if (stat(fname.c_str(), &s) !=0) { + return IOError("while stat a file for modification time", fname, errno); + } + *file_mtime = static_cast<uint64_t>(s.st_mtime); + return Status::OK(); + } + Status RenameFile(const std::string& src, + const std::string& target) override { + Status result; + if (rename(src.c_str(), target.c_str()) != 0) { + result = IOError("While renaming a file to " + target, src, errno); + } + return result; + } + + Status LinkFile(const std::string& src, const std::string& target) override { + Status result; + if (link(src.c_str(), target.c_str()) != 0) { + if (errno == EXDEV) { + return Status::NotSupported("No cross FS links allowed"); + } + result = IOError("while link file to " + target, src, errno); + } + return result; + } + + Status NumFileLinks(const std::string& fname, uint64_t* count) override { + struct stat s; + if (stat(fname.c_str(), &s) != 0) { + return IOError("while stat a file for num file links", fname, errno); + } + *count = static_cast<uint64_t>(s.st_nlink); + return Status::OK(); + } + + Status AreFilesSame(const std::string& first, const std::string& second, + bool* res) override { + struct stat statbuf[2]; + if (stat(first.c_str(), &statbuf[0]) != 0) { + return IOError("stat file", first, errno); + } + if (stat(second.c_str(), &statbuf[1]) != 0) { + return IOError("stat file", second, errno); + } + + if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) || + minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) || + statbuf[0].st_ino != statbuf[1].st_ino) { + *res = false; + } else { + *res = true; + } + return Status::OK(); + } + + Status LockFile(const std::string& fname, FileLock** lock) override { + *lock = nullptr; + Status result; + + mutex_lockedFiles.Lock(); + // If it already exists in the lockedFiles set, then it is already locked, + // and fail this lock attempt. Otherwise, insert it into lockedFiles. + // This check is needed because fcntl() does not detect lock conflict + // if the fcntl is issued by the same thread that earlier acquired + // this lock. + // We must do this check *before* opening the file: + // Otherwise, we will open a new file descriptor. Locks are associated with + // a process, not a file descriptor and when *any* file descriptor is closed, + // all locks the process holds for that *file* are released + if (lockedFiles.insert(fname).second == false) { + mutex_lockedFiles.Unlock(); + errno = ENOLCK; + return IOError("lock ", fname, errno); + } + + int fd; + int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr); + + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, 0644); + } + if (fd < 0) { + result = IOError("while open a file for lock", fname, errno); + } else if (LockOrUnlock(fd, true) == -1) { + // if there is an error in locking, then remove the pathname from lockedfiles + lockedFiles.erase(fname); + result = IOError("While lock file", fname, errno); + close(fd); + } else { + SetFD_CLOEXEC(fd, nullptr); + PosixFileLock* my_lock = new PosixFileLock; + my_lock->fd_ = fd; + my_lock->filename = fname; + *lock = my_lock; + } + + mutex_lockedFiles.Unlock(); + return result; + } + + Status UnlockFile(FileLock* lock) override { + PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); + Status result; + mutex_lockedFiles.Lock(); + // If we are unlocking, then verify that we had locked it earlier, + // it should already exist in lockedFiles. Remove it from lockedFiles. + if (lockedFiles.erase(my_lock->filename) != 1) { + errno = ENOLCK; + result = IOError("unlock", my_lock->filename, errno); + } else if (LockOrUnlock(my_lock->fd_, false) == -1) { + result = IOError("unlock", my_lock->filename, errno); + } + close(my_lock->fd_); + delete my_lock; + mutex_lockedFiles.Unlock(); + return result; + } + + void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW, + void* tag = nullptr, + void (*unschedFunction)(void* arg) = nullptr) override; + + int UnSchedule(void* arg, Priority pri) override; + + void StartThread(void (*function)(void* arg), void* arg) override; + + void WaitForJoin() override; + + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + + Status GetTestDirectory(std::string* result) override { + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + *result = env; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid())); + *result = buf; + } + // Directory may already exist + CreateDir(*result); + return Status::OK(); + } + + Status GetThreadList(std::vector<ThreadStatus>* thread_list) override { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); + } + + static uint64_t gettid(pthread_t tid) { + uint64_t thread_id = 0; + memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); + return thread_id; + } + + static uint64_t gettid() { + pthread_t tid = pthread_self(); + return gettid(tid); + } + + uint64_t GetThreadID() const override { return gettid(pthread_self()); } + + Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override { + struct statvfs sbuf; + + if (statvfs(fname.c_str(), &sbuf) < 0) { + return IOError("While doing statvfs", fname, errno); + } + + *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree); + return Status::OK(); + } + + Status NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) override { + FILE* f; + { + IOSTATS_TIMER_GUARD(open_nanos); + f = fopen(fname.c_str(), "w" +#ifdef __GLIBC_PREREQ +#if __GLIBC_PREREQ(2, 7) + "e" // glibc extension to enable O_CLOEXEC +#endif +#endif + ); + } + if (f == nullptr) { + result->reset(); + return IOError("when fopen a file for new logger", fname, errno); + } else { + int fd = fileno(f); +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024); +#endif + SetFD_CLOEXEC(fd, nullptr); + result->reset(new PosixLogger(f, &PosixEnv::gettid, this)); + return Status::OK(); + } + } + + uint64_t NowMicros() override { + struct timeval tv; + gettimeofday(&tv, nullptr); + return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; + } + + uint64_t NowNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#elif defined(OS_SOLARIS) + return gethrtime(); +#elif defined(__MACH__) + clock_serv_t cclock; + mach_timespec_t ts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &ts); + mach_port_deallocate(mach_task_self(), cclock); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#else + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()).count(); +#endif + } + + uint64_t NowCPUNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \ + defined(__MACH__) + struct timespec ts; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); + return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#endif + return 0; + } + + void SleepForMicroseconds(int micros) override { usleep(micros); } + + Status GetHostName(char* name, uint64_t len) override { + int ret = gethostname(name, static_cast<size_t>(len)); + if (ret < 0) { + if (errno == EFAULT || errno == EINVAL) + return Status::InvalidArgument(strerror(errno)); + else + return IOError("GetHostName", name, errno); + } + return Status::OK(); + } + + Status GetCurrentTime(int64_t* unix_time) override { + time_t ret = time(nullptr); + if (ret == (time_t) -1) { + return IOError("GetCurrentTime", "", errno); + } + *unix_time = (int64_t) ret; + return Status::OK(); + } + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override { + if (!db_path.empty() && db_path[0] == '/') { + *output_path = db_path; + return Status::OK(); + } + + char the_path[256]; + char* ret = getcwd(the_path, 256); + if (ret == nullptr) { + return Status::IOError(strerror(errno)); + } + + *output_path = ret; + return Status::OK(); + } + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); + } + + int GetBackgroundThreads(Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); + } + + Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override { + allow_non_owner_access_ = allow_non_owner_access; + return Status::OK(); + } + + // Allow increasing the number of worker threads. + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); + } + + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerIOPriority(); +#else + (void)pool; +#endif + } + + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerCPUPriority(); +#else + (void)pool; +#endif + } + + std::string TimeToString(uint64_t secondsSince1970) override { + const time_t seconds = (time_t)secondsSince1970; + struct tm t; + int maxsize = 64; + std::string dummy; + dummy.reserve(maxsize); + dummy.resize(maxsize); + char* p = &dummy[0]; + localtime_r(&seconds, &t); + snprintf(p, maxsize, + "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec); + return dummy; + } + + EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const override { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; + optimized.bytes_per_sync = db_options.wal_bytes_per_sync; + // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it + // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit + // test and make this false + optimized.fallocate_with_keep_size = true; + optimized.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; + return optimized; + } + + EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const override { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; + optimized.fallocate_with_keep_size = true; + return optimized; + } + + private: + bool checkedDiskForMmap_; + bool forceMmapOff_; // do we override Env options? + + // Returns true iff the named directory exists and is a directory. + virtual bool DirExists(const std::string& dname) { + struct stat statbuf; + if (stat(dname.c_str(), &statbuf) == 0) { + return S_ISDIR(statbuf.st_mode); + } + return false; // stat() failed return false + } + + bool SupportsFastAllocate(const std::string& path) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + struct statfs s; + if (statfs(path.c_str(), &s)){ + return false; + } + switch (s.f_type) { + case EXT4_SUPER_MAGIC: + return true; + case XFS_SUPER_MAGIC: + return true; + case TMPFS_MAGIC: + return true; + default: + return false; + } +#else + (void)path; + return false; +#endif + } + + size_t page_size_; + + std::vector<ThreadPoolImpl> thread_pools_; + pthread_mutex_t mu_; + std::vector<pthread_t> threads_to_join_; + // If true, allow non owner read access for db files. Otherwise, non-owner + // has no access to db files. + bool allow_non_owner_access_; +}; + +PosixEnv::PosixEnv() + : checkedDiskForMmap_(false), + forceMmapOff_(false), + page_size_(getpagesize()), + thread_pools_(Priority::TOTAL), + allow_non_owner_access_(true) { + ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].SetThreadPriority( + static_cast<Env::Priority>(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(this); + } + thread_status_updater_ = CreateThreadStatusUpdater(); +} + +void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, + void* tag, void (*unschedFunction)(void* arg)) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); +} + +int PosixEnv::UnSchedule(void* arg, Priority pri) { + return thread_pools_[pri].UnSchedule(arg); +} + +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + +struct StartThreadState { + void (*user_function)(void*); + void* arg; +}; + +static void* StartThreadWrapper(void* arg) { + StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); + state->user_function(state->arg); + delete state; + return nullptr; +} + +void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { + pthread_t t; + StartThreadState* state = new StartThreadState; + state->user_function = function; + state->arg = arg; + ThreadPoolImpl::PthreadCall( + "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); + threads_to_join_.push_back(t); + ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void PosixEnv::WaitForJoin() { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + threads_to_join_.clear(); +} + +} // namespace + +std::string Env::GenerateUniqueId() { + std::string uuid_file = "/proc/sys/kernel/random/uuid"; + + Status s = FileExists(uuid_file); + if (s.ok()) { + std::string uuid; + s = ReadFileToString(this, uuid_file, &uuid); + if (s.ok()) { + return uuid; + } + } + // Could not read uuid_file - generate uuid using "nanos-random" + Random64 r(time(nullptr)); + uint64_t random_uuid_portion = + r.Uniform(std::numeric_limits<uint64_t>::max()); + uint64_t nanos_uuid_portion = NowNanos(); + char uuid2[200]; + snprintf(uuid2, + 200, + "%lx-%lx", + (unsigned long)nanos_uuid_portion, + (unsigned long)random_uuid_portion); + return uuid2; +} + +// +// Default Posix Env +// +Env* Env::Default() { + // The following function call initializes the singletons of ThreadLocalPtr + // right before the static default_env. This guarantees default_env will + // always being destructed before the ThreadLocalPtr singletons get + // destructed as C++ guarantees that the destructions of static variables + // is in the reverse order of their constructions. + // + // Since static members are destructed in the reverse order + // of their construction, having this call here guarantees that + // the destructor of static PosixEnv will go first, then the + // the singletons of ThreadLocalPtr. + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); + INIT_SYNC_POINT_SINGLETONS(); + static PosixEnv default_env; + return &default_env; +} + +} // namespace rocksdb |