diff options
Diffstat (limited to 'src/rocksdb/env/io_posix.cc')
-rw-r--r-- | src/rocksdb/env/io_posix.cc | 1733 |
1 files changed, 1733 insertions, 0 deletions
diff --git a/src/rocksdb/env/io_posix.cc b/src/rocksdb/env/io_posix.cc new file mode 100644 index 000000000..0ec0e9c83 --- /dev/null +++ b/src/rocksdb/env/io_posix.cc @@ -0,0 +1,1733 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifdef ROCKSDB_LIB_IO_POSIX +#include "env/io_posix.h" + +#include <errno.h> +#include <fcntl.h> + +#include <algorithm> +#if defined(OS_LINUX) +#include <linux/fs.h> +#ifndef FALLOC_FL_KEEP_SIZE +#include <linux/falloc.h> +#endif +#endif +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <sys/types.h> +#ifdef OS_LINUX +#include <sys/statfs.h> +#include <sys/sysmacros.h> +#endif +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/slice.h" +#include "test_util/sync_point.h" +#include "util/autovector.h" +#include "util/coding.h" +#include "util/string_util.h" + +#if defined(OS_LINUX) && !defined(F_SET_RW_HINT) +#define F_LINUX_SPECIFIC_BASE 1024 +#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12) +#endif + +namespace ROCKSDB_NAMESPACE { + +std::string IOErrorMsg(const std::string& context, + const std::string& file_name) { + if (file_name.empty()) { + return context; + } + return context + ": " + file_name; +} + +// file_name can be left empty if it is not unkown. +IOStatus IOError(const std::string& context, const std::string& file_name, + int err_number) { + switch (err_number) { + case ENOSPC: { + IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name), + errnoStr(err_number).c_str()); + s.SetRetryable(true); + return s; + } + case ESTALE: + return IOStatus::IOError(IOStatus::kStaleFile); + case ENOENT: + return IOStatus::PathNotFound(IOErrorMsg(context, file_name), + errnoStr(err_number).c_str()); + default: + return IOStatus::IOError(IOErrorMsg(context, file_name), + errnoStr(err_number).c_str()); + } +} + +// A wrapper for fadvise, if the platform doesn't support fadvise, +// it will simply return 0. +int Fadvise(int fd, off_t offset, size_t len, int advice) { +#ifdef OS_LINUX + return posix_fadvise(fd, offset, len, advice); +#else + (void)fd; + (void)offset; + (void)len; + (void)advice; + return 0; // simply do nothing. +#endif +} + +// A wrapper for fadvise, if the platform doesn't support fadvise, +// it will simply return 0. +int Madvise(void* addr, size_t len, int advice) { +#ifdef OS_LINUX + return posix_madvise(addr, len, advice); +#else + (void)addr; + (void)len; + (void)advice; + return 0; // simply do nothing. +#endif +} + +namespace { + +// On MacOS (and probably *BSD), the posix write and pwrite calls do not support +// buffers larger than 2^31-1 bytes. These two wrappers fix this issue by +// cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep +// the writes aligned. + +bool PosixWrite(int fd, const char* buf, size_t nbyte) { + const size_t kLimit1Gb = 1UL << 30; + + const char* src = buf; + size_t left = nbyte; + + while (left != 0) { + size_t bytes_to_write = std::min(left, kLimit1Gb); + + ssize_t done = write(fd, src, bytes_to_write); + if (done < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + left -= done; + src += done; + } + return true; +} + +bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) { + const size_t kLimit1Gb = 1UL << 30; + + const char* src = buf; + size_t left = nbyte; + + while (left != 0) { + size_t bytes_to_write = std::min(left, kLimit1Gb); + + ssize_t done = pwrite(fd, src, bytes_to_write, offset); + if (done < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + left -= done; + offset += done; + src += done; + } + + return true; +} + +#ifdef ROCKSDB_RANGESYNC_PRESENT + +#if !defined(ZFS_SUPER_MAGIC) +// The magic number for ZFS was not exposed until recently. It should be fixed +// forever so we can just copy the magic number here. +#define ZFS_SUPER_MAGIC 0x2fc12fc1 +#endif + +bool IsSyncFileRangeSupported(int fd) { + // This function tracks and checks for cases where we know `sync_file_range` + // definitely will not work properly despite passing the compile-time check + // (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or if any of the checks + // fail in unexpected ways, we allow `sync_file_range` to be used. This way + // should minimize risk of impacting existing use cases. + struct statfs buf; + int ret = fstatfs(fd, &buf); + assert(ret == 0); + if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) { + // Testing on ZFS showed the writeback did not happen asynchronously when + // `sync_file_range` was called, even though it returned success. Avoid it + // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`, + // even though this'll incur extra I/O for metadata. + return false; + } + + ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */); + assert(!(ret == -1 && errno != ENOSYS)); + if (ret == -1 && errno == ENOSYS) { + // `sync_file_range` is not implemented on all platforms even if + // compile-time checks pass and a supported filesystem is in-use. For + // example, using ext4 on WSL (Windows Subsystem for Linux), + // `sync_file_range()` returns `ENOSYS` + // ("Function not implemented"). + return false; + } + // None of the known cases matched, so allow `sync_file_range` use. + return true; +} + +#undef ZFS_SUPER_MAGIC + +#endif // ROCKSDB_RANGESYNC_PRESENT + +} // anonymous namespace + +/* + * PosixSequentialFile + */ +PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file, + int fd, size_t logical_block_size, + const EnvOptions& options) + : filename_(fname), + file_(file), + fd_(fd), + use_direct_io_(options.use_direct_reads), + logical_sector_size_(logical_block_size) { + assert(!options.use_direct_reads || !options.use_mmap_reads); +} + +PosixSequentialFile::~PosixSequentialFile() { + if (!use_direct_io()) { + assert(file_); + fclose(file_); + } else { + assert(fd_); + close(fd_); + } +} + +IOStatus PosixSequentialFile::Read(size_t n, const IOOptions& /*opts*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) { + assert(result != nullptr && !use_direct_io()); + IOStatus s; + size_t r = 0; + do { + clearerr(file_); + r = fread_unlocked(scratch, 1, n, file_); + } while (r == 0 && ferror(file_) && errno == EINTR); + *result = Slice(scratch, r); + if (r < n) { + if (feof(file_)) { + // We leave status as ok if we hit the end of the file + // We also clear the error so that the reads can continue + // if a new data is written to the file + clearerr(file_); + } else { + // A partial read with an error: return a non-ok status + s = IOError("While reading file sequentially", filename_, errno); + } + } + return s; +} + +IOStatus PosixSequentialFile::PositionedRead(uint64_t offset, size_t n, + const IOOptions& /*opts*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) { + assert(use_direct_io()); + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(n, GetRequiredBufferAlignment())); + assert(IsSectorAligned(scratch, GetRequiredBufferAlignment())); + + IOStatus s; + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast<off_t>(offset)); + if (r <= 0) { + if (r == -1 && errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + if (!IsSectorAligned(r, GetRequiredBufferAlignment())) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } + } + if (r < 0) { + // An error: return a non-ok status + s = IOError("While pread " + std::to_string(n) + " bytes from offset " + + std::to_string(offset), + filename_, errno); + } + *result = Slice(scratch, (r < 0) ? 0 : n - left); + return s; +} + +IOStatus PosixSequentialFile::Skip(uint64_t n) { + if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) { + return IOError("While fseek to skip " + std::to_string(n) + " bytes", + filename_, errno); + } + return IOStatus::OK(); +} + +IOStatus PosixSequentialFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return IOStatus::OK(); +#else + if (!use_direct_io()) { + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret != 0) { + return IOError("While fadvise NotNeeded offset " + + std::to_string(offset) + " len " + + std::to_string(length), + filename_, errno); + } + } + return IOStatus::OK(); +#endif +} + +/* + * PosixRandomAccessFile + */ +#if defined(OS_LINUX) +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length * 3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + long version = 0; + result = ioctl(fd, FS_IOC_GETVERSION, &version); + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); + if (result == -1) { + return 0; + } + uint64_t uversion = (uint64_t)version; + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, uversion); + assert(rid >= id); + return static_cast<size_t>(rid - id); +} +#endif + +#if defined(OS_MACOSX) || defined(OS_AIX) +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length * 3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, buf.st_gen); + assert(rid >= id); + return static_cast<size_t>(rid - id); +} +#endif + +#ifdef OS_LINUX +std::string RemoveTrailingSlash(const std::string& path) { + std::string p = path; + if (p.size() > 1 && p.back() == '/') { + p.pop_back(); + } + return p; +} + +Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize( + const std::vector<std::string>& directories) { + std::vector<std::string> dirs; + dirs.reserve(directories.size()); + for (auto& d : directories) { + dirs.emplace_back(RemoveTrailingSlash(d)); + } + + std::map<std::string, size_t> dir_sizes; + { + ReadLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + if (cache_.find(dir) == cache_.end()) { + dir_sizes.emplace(dir, 0); + } + } + } + + Status s; + for (auto& dir_size : dir_sizes) { + s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second); + if (!s.ok()) { + return s; + } + } + + WriteLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + auto& v = cache_[dir]; + v.ref++; + auto dir_size = dir_sizes.find(dir); + if (dir_size != dir_sizes.end()) { + v.size = dir_size->second; + } + } + return s; +} + +void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize( + const std::vector<std::string>& directories) { + std::vector<std::string> dirs; + dirs.reserve(directories.size()); + for (auto& dir : directories) { + dirs.emplace_back(RemoveTrailingSlash(dir)); + } + + WriteLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + auto it = cache_.find(dir); + if (it != cache_.end() && !(--(it->second.ref))) { + cache_.erase(it); + } + } +} + +size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname, + int fd) { + std::string dir = fname.substr(0, fname.find_last_of("/")); + if (dir.empty()) { + dir = "/"; + } + { + ReadLock lock(&cache_mutex_); + auto it = cache_.find(dir); + if (it != cache_.end()) { + return it->second.size; + } + } + return get_logical_block_size_of_fd_(fd); +} +#endif + +Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory, + size_t* size) { + int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY); + if (fd == -1) { + close(fd); + return Status::IOError("Cannot open directory " + directory); + } + *size = PosixHelper::GetLogicalBlockSizeOfFd(fd); + close(fd); + return Status::OK(); +} + +size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) { +#ifdef OS_LINUX + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return kDefaultPageSize; + } + if (major(buf.st_dev) == 0) { + // Unnamed devices (e.g. non-device mounts), reserved as null device number. + // These don't have an entry in /sys/dev/block/. Return a sensible default. + return kDefaultPageSize; + } + + // Reading queue/logical_block_size does not require special permissions. + const int kBufferSize = 100; + char path[kBufferSize]; + char real_path[PATH_MAX + 1]; + snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev), + minor(buf.st_dev)); + if (realpath(path, real_path) == nullptr) { + return kDefaultPageSize; + } + std::string device_dir(real_path); + if (!device_dir.empty() && device_dir.back() == '/') { + device_dir.pop_back(); + } + // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda + // and nvme0n1 have it. + // $ ls -al '/sys/dev/block/8:3' + // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 -> + // ../../block/sda/sda3 + // $ ls -al '/sys/dev/block/259:4' + // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 -> + // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1 + size_t parent_end = device_dir.rfind('/', device_dir.length() - 1); + if (parent_end == std::string::npos) { + return kDefaultPageSize; + } + size_t parent_begin = device_dir.rfind('/', parent_end - 1); + if (parent_begin == std::string::npos) { + return kDefaultPageSize; + } + std::string parent = + device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1); + std::string child = device_dir.substr(parent_end + 1, std::string::npos); + if (parent != "block" && + (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) { + device_dir = device_dir.substr(0, parent_end); + } + std::string fname = device_dir + "/queue/logical_block_size"; + FILE* fp; + size_t size = 0; + fp = fopen(fname.c_str(), "r"); + if (fp != nullptr) { + char* line = nullptr; + size_t len = 0; + if (getline(&line, &len, fp) != -1) { + sscanf(line, "%zu", &size); + } + free(line); + fclose(fp); + } + if (size != 0 && (size & (size - 1)) == 0) { + return size; + } +#endif + (void)fd; + return kDefaultPageSize; +} + +/* + * PosixRandomAccessFile + * + * pread() based random-access + */ +PosixRandomAccessFile::PosixRandomAccessFile( + const std::string& fname, int fd, size_t logical_block_size, + const EnvOptions& options +#if defined(ROCKSDB_IOURING_PRESENT) + , + ThreadLocalPtr* thread_local_io_urings +#endif + ) + : filename_(fname), + fd_(fd), + use_direct_io_(options.use_direct_reads), + logical_sector_size_(logical_block_size) +#if defined(ROCKSDB_IOURING_PRESENT) + , + thread_local_io_urings_(thread_local_io_urings) +#endif +{ + assert(!options.use_direct_reads || !options.use_mmap_reads); + assert(!options.use_mmap_reads); +} + +PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); } + +IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n, + const IOOptions& /*opts*/, Slice* result, + char* scratch, + IODebugContext* /*dbg*/) const { + if (use_direct_io()) { + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(n, GetRequiredBufferAlignment())); + assert(IsSectorAligned(scratch, GetRequiredBufferAlignment())); + } + IOStatus s; + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast<off_t>(offset)); + if (r <= 0) { + if (r == -1 && errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + if (use_direct_io() && + r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } + } + if (r < 0) { + // An error: return a non-ok status + s = IOError("While pread offset " + std::to_string(offset) + " len " + + std::to_string(n), + filename_, errno); + } + *result = Slice(scratch, (r < 0) ? 0 : n - left); + return s; +} + +IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) { + if (use_direct_io()) { + for (size_t i = 0; i < num_reqs; i++) { + assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment())); + assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment())); + } + } + +#if defined(ROCKSDB_IOURING_PRESENT) + struct io_uring* iu = nullptr; + if (thread_local_io_urings_) { + iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get()); + if (iu == nullptr) { + iu = CreateIOUring(); + if (iu != nullptr) { + thread_local_io_urings_->Reset(iu); + } + } + } + + // Init failed, platform doesn't support io_uring. Fall back to + // serialized reads + if (iu == nullptr) { + return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); + } + + IOStatus ios = IOStatus::OK(); + + struct WrappedReadRequest { + FSReadRequest* req; + struct iovec iov; + size_t finished_len; + explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {} + }; + + autovector<WrappedReadRequest, 32> req_wraps; + autovector<WrappedReadRequest*, 4> incomplete_rq_list; + std::unordered_set<WrappedReadRequest*> wrap_cache; + + for (size_t i = 0; i < num_reqs; i++) { + req_wraps.emplace_back(&reqs[i]); + } + + size_t reqs_off = 0; + while (num_reqs > reqs_off || !incomplete_rq_list.empty()) { + size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size(); + + // If requests exceed depth, split it into batches + if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth; + + assert(incomplete_rq_list.size() <= this_reqs); + for (size_t i = 0; i < this_reqs; i++) { + WrappedReadRequest* rep_to_submit; + if (i < incomplete_rq_list.size()) { + rep_to_submit = incomplete_rq_list[i]; + } else { + rep_to_submit = &req_wraps[reqs_off++]; + } + assert(rep_to_submit->req->len > rep_to_submit->finished_len); + rep_to_submit->iov.iov_base = + rep_to_submit->req->scratch + rep_to_submit->finished_len; + rep_to_submit->iov.iov_len = + rep_to_submit->req->len - rep_to_submit->finished_len; + + struct io_uring_sqe* sqe; + sqe = io_uring_get_sqe(iu); + io_uring_prep_readv( + sqe, fd_, &rep_to_submit->iov, 1, + rep_to_submit->req->offset + rep_to_submit->finished_len); + io_uring_sqe_set_data(sqe, rep_to_submit); + wrap_cache.emplace(rep_to_submit); + } + incomplete_rq_list.clear(); + + ssize_t ret = + io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs)); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1", + &ret); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2", + iu); + + if (static_cast<size_t>(ret) != this_reqs) { + fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); + // If error happens and we submitted fewer than expected, it is an + // exception case and we don't retry here. We should still consume + // what is is submitted in the ring. + for (ssize_t i = 0; i < ret; i++) { + struct io_uring_cqe* cqe = nullptr; + io_uring_wait_cqe(iu, &cqe); + if (cqe != nullptr) { + io_uring_cqe_seen(iu, cqe); + } + } + return IOStatus::IOError("io_uring_submit_and_wait() requested " + + std::to_string(this_reqs) + " but returned " + + std::to_string(ret)); + } + + for (size_t i = 0; i < this_reqs; i++) { + struct io_uring_cqe* cqe = nullptr; + WrappedReadRequest* req_wrap; + + // We could use the peek variant here, but this seems safer in terms + // of our initial wait not reaping all completions + ret = io_uring_wait_cqe(iu, &cqe); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret); + if (ret) { + ios = IOStatus::IOError("io_uring_wait_cqe() returns " + + std::to_string(ret)); + + if (cqe != nullptr) { + io_uring_cqe_seen(iu, cqe); + } + continue; + } + + req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe)); + // Reset cqe data to catch any stray reuse of it + static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5; + // Check that we got a valid unique cqe data + auto wrap_check = wrap_cache.find(req_wrap); + if (wrap_check == wrap_cache.end()) { + fprintf(stderr, + "PosixRandomAccessFile::MultiRead: " + "Bad cqe data from IO uring - %p\n", + req_wrap); + port::PrintStack(); + ios = IOStatus::IOError("io_uring_cqe_get_data() returned " + + std::to_string((uint64_t)req_wrap)); + continue; + } + wrap_cache.erase(wrap_check); + + FSReadRequest* req = req_wrap->req; + size_t bytes_read = 0; + bool read_again = false; + UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, + false /*async_read*/, use_direct_io(), + GetRequiredBufferAlignment(), req_wrap->finished_len, req, + bytes_read, read_again); + int32_t res = cqe->res; + if (res >= 0) { + if (bytes_read == 0) { + if (read_again) { + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + } + // else It means EOF so no need to do anything. + } else if (bytes_read < req_wrap->iov.iov_len) { + incomplete_rq_list.push_back(req_wrap); + } + } + io_uring_cqe_seen(iu, cqe); + } + wrap_cache.clear(); + } + return ios; +#else + return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); +#endif +} + +IOStatus PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n, + const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + IOStatus s; + if (!use_direct_io()) { + ssize_t r = 0; +#ifdef OS_LINUX + r = readahead(fd_, offset, n); +#endif +#ifdef OS_MACOSX + radvisory advice; + advice.ra_offset = static_cast<off_t>(offset); + advice.ra_count = static_cast<int>(n); + r = fcntl(fd_, F_RDADVISE, &advice); +#endif + if (r == -1) { + s = IOError("While prefetching offset " + std::to_string(offset) + + " len " + std::to_string(n), + filename_, errno); + } + } + return s; +} + +#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX) +size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); +} +#endif + +void PosixRandomAccessFile::Hint(AccessPattern pattern) { + if (use_direct_io()) { + return; + } + switch (pattern) { + case kNormal: + Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL); + break; + case kRandom: + Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM); + break; + case kSequential: + Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL); + break; + case kWillNeed: + Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED); + break; + case kWontNeed: + Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); + break; + default: + assert(false); + break; + } +} + +IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { + if (use_direct_io()) { + return IOStatus::OK(); + } +#ifndef OS_LINUX + (void)offset; + (void)length; + return IOStatus::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return IOStatus::OK(); + } + return IOError("While fadvise NotNeeded offset " + std::to_string(offset) + + " len " + std::to_string(length), + filename_, errno); +#endif +} + +IOStatus PosixRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& /*opts*/, + std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { + if (use_direct_io()) { + assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(req.len, GetRequiredBufferAlignment())); + assert(IsSectorAligned(req.scratch, GetRequiredBufferAlignment())); + } + +#if defined(ROCKSDB_IOURING_PRESENT) + // io_uring_queue_init. + struct io_uring* iu = nullptr; + if (thread_local_io_urings_) { + iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get()); + if (iu == nullptr) { + iu = CreateIOUring(); + if (iu != nullptr) { + thread_local_io_urings_->Reset(iu); + } + } + } + + // Init failed, platform doesn't support io_uring. + if (iu == nullptr) { + return IOStatus::NotSupported("ReadAsync"); + } + + // Allocate io_handle. + IOHandleDeleter deletefn = [](void* args) -> void { + delete (static_cast<Posix_IOHandle*>(args)); + args = nullptr; + }; + + // Initialize Posix_IOHandle. + Posix_IOHandle* posix_handle = + new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch, + use_direct_io(), GetRequiredBufferAlignment()); + posix_handle->iov.iov_base = req.scratch; + posix_handle->iov.iov_len = req.len; + + *io_handle = static_cast<void*>(posix_handle); + *del_fn = deletefn; + + // Step 3: io_uring_sqe_set_data + struct io_uring_sqe* sqe; + sqe = io_uring_get_sqe(iu); + + io_uring_prep_readv(sqe, fd_, /*sqe->addr=*/&posix_handle->iov, + /*sqe->len=*/1, /*sqe->offset=*/posix_handle->offset); + + // Sets sqe->user_data to posix_handle. + io_uring_sqe_set_data(sqe, posix_handle); + + // Step 4: io_uring_submit + ssize_t ret = io_uring_submit(iu); + if (ret < 0) { + fprintf(stderr, "io_uring_submit error: %ld\n", long(ret)); + return IOStatus::IOError("io_uring_submit() requested but returned " + + std::to_string(ret)); + } + return IOStatus::OK(); +#else + (void)req; + (void)cb; + (void)cb_arg; + (void)io_handle; + (void)del_fn; + return IOStatus::NotSupported("ReadAsync"); +#endif +} + +/* + * PosixMmapReadableFile + * + * mmap() based random-access + */ +// base[0,length-1] contains the mmapped contents of the file. +PosixMmapReadableFile::PosixMmapReadableFile(const int fd, + const std::string& fname, + void* base, size_t length, + const EnvOptions& options) + : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) { +#ifdef NDEBUG + (void)options; +#endif + fd_ = fd_ + 0; // suppress the warning for used variables + assert(options.use_mmap_reads); + assert(!options.use_direct_reads); +} + +PosixMmapReadableFile::~PosixMmapReadableFile() { + int ret = munmap(mmapped_region_, length_); + if (ret != 0) { + fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n", + mmapped_region_, length_); + } + close(fd_); +} + +IOStatus PosixMmapReadableFile::Read(uint64_t offset, size_t n, + const IOOptions& /*opts*/, Slice* result, + char* /*scratch*/, + IODebugContext* /*dbg*/) const { + IOStatus s; + if (offset > length_) { + *result = Slice(); + return IOError("While mmap read offset " + std::to_string(offset) + + " larger than file length " + std::to_string(length_), + filename_, EINVAL); + } else if (offset + n > length_) { + n = static_cast<size_t>(length_ - offset); + } + *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n); + return s; +} + +void PosixMmapReadableFile::Hint(AccessPattern pattern) { + switch (pattern) { + case kNormal: + Madvise(mmapped_region_, length_, POSIX_MADV_NORMAL); + break; + case kRandom: + Madvise(mmapped_region_, length_, POSIX_MADV_RANDOM); + break; + case kSequential: + Madvise(mmapped_region_, length_, POSIX_MADV_SEQUENTIAL); + break; + case kWillNeed: + Madvise(mmapped_region_, length_, POSIX_MADV_WILLNEED); + break; + case kWontNeed: + Madvise(mmapped_region_, length_, POSIX_MADV_DONTNEED); + break; + default: + assert(false); + break; + } +} + +IOStatus PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return IOStatus::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return IOStatus::OK(); + } + return IOError("While fadvise not needed. Offset " + std::to_string(offset) + + " len" + std::to_string(length), + filename_, errno); +#endif +} + +/* + * PosixMmapFile + * + * We preallocate up to an extra megabyte and use memcpy to append new + * data to the file. This is safe since we either properly close the + * file before reading from it, or for log files, the reading code + * knows enough to skip zero suffixes. + */ +IOStatus PosixMmapFile::UnmapCurrentRegion() { + TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0"); + if (base_ != nullptr) { + int munmap_status = munmap(base_, limit_ - base_); + if (munmap_status != 0) { + return IOError("While munmap", filename_, munmap_status); + } + file_offset_ += limit_ - base_; + base_ = nullptr; + limit_ = nullptr; + last_sync_ = nullptr; + dst_ = nullptr; + + // Increase the amount we map the next time, but capped at 1MB + if (map_size_ < (1 << 20)) { + map_size_ *= 2; + } + } + return IOStatus::OK(); +} + +IOStatus PosixMmapFile::MapNewRegion() { +#ifdef ROCKSDB_FALLOCATE_PRESENT + assert(base_ == nullptr); + TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0"); + // we can't fallocate with FALLOC_FL_KEEP_SIZE here + if (allow_fallocate_) { + IOSTATS_TIMER_GUARD(allocate_nanos); + int alloc_status = fallocate(fd_, 0, file_offset_, map_size_); + if (alloc_status != 0) { + // fallback to posix_fallocate + alloc_status = posix_fallocate(fd_, file_offset_, map_size_); + } + if (alloc_status != 0) { + return IOStatus::IOError("Error allocating space to file : " + filename_ + + "Error : " + errnoStr(alloc_status).c_str()); + } + } + + TEST_KILL_RANDOM("PosixMmapFile::Append:1"); + void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, + file_offset_); + if (ptr == MAP_FAILED) { + return IOStatus::IOError("MMap failed on " + filename_); + } + TEST_KILL_RANDOM("PosixMmapFile::Append:2"); + + base_ = reinterpret_cast<char*>(ptr); + limit_ = base_ + map_size_; + dst_ = base_; + last_sync_ = base_; + return IOStatus::OK(); +#else + return IOStatus::NotSupported("This platform doesn't support fallocate()"); +#endif +} + +IOStatus PosixMmapFile::Msync() { + if (dst_ == last_sync_) { + return IOStatus::OK(); + } + // Find the beginnings of the pages that contain the first and last + // bytes to be synced. + size_t p1 = TruncateToPageBoundary(last_sync_ - base_); + size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); + last_sync_ = dst_; + TEST_KILL_RANDOM("PosixMmapFile::Msync:0"); + if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) { + return IOError("While msync", filename_, errno); + } + return IOStatus::OK(); +} + +PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size, + const EnvOptions& options) + : filename_(fname), + fd_(fd), + page_size_(page_size), + map_size_(Roundup(65536, page_size)), + base_(nullptr), + limit_(nullptr), + dst_(nullptr), + last_sync_(nullptr), + file_offset_(0) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + allow_fallocate_ = options.allow_fallocate; + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#else + (void)options; +#endif + assert((page_size & (page_size - 1)) == 0); + assert(options.use_mmap_writes); + assert(!options.use_direct_writes); +} + +PosixMmapFile::~PosixMmapFile() { + if (fd_ >= 0) { + IOStatus s = PosixMmapFile::Close(IOOptions(), nullptr); + s.PermitUncheckedError(); + } +} + +IOStatus PosixMmapFile::Append(const Slice& data, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + const char* src = data.data(); + size_t left = data.size(); + while (left > 0) { + assert(base_ <= dst_); + assert(dst_ <= limit_); + size_t avail = limit_ - dst_; + if (avail == 0) { + IOStatus s = UnmapCurrentRegion(); + if (!s.ok()) { + return s; + } + s = MapNewRegion(); + if (!s.ok()) { + return s; + } + TEST_KILL_RANDOM("PosixMmapFile::Append:0"); + } + + size_t n = (left <= avail) ? left : avail; + assert(dst_); + memcpy(dst_, src, n); + dst_ += n; + src += n; + left -= n; + } + return IOStatus::OK(); +} + +IOStatus PosixMmapFile::Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + IOStatus s; + size_t unused = limit_ - dst_; + + s = UnmapCurrentRegion(); + if (!s.ok()) { + s = IOError("While closing mmapped file", filename_, errno); + } else if (unused > 0) { + // Trim the extra space at the end of the file + if (ftruncate(fd_, file_offset_ - unused) < 0) { + s = IOError("While ftruncating mmaped file", filename_, errno); + } + } + + if (close(fd_) < 0) { + if (s.ok()) { + s = IOError("While closing mmapped file", filename_, errno); + } + } + + fd_ = -1; + base_ = nullptr; + limit_ = nullptr; + return s; +} + +IOStatus PosixMmapFile::Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + return IOStatus::OK(); +} + +IOStatus PosixMmapFile::Sync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("while fcntl(F_FULLSYNC) mmapped file", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync mmapped file", filename_, errno); + } +#endif // HAVE_FULLFSYNC + + return Msync(); +} + +/** + * Flush data as well as metadata to stable storage. + */ +IOStatus PosixMmapFile::Fsync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("While fcntl(F_FULLSYNC) on mmaped file", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fsync(fd_) < 0) { + return IOError("While fsync mmaped file", filename_, errno); + } +#endif // HAVE_FULLFSYNC + + return Msync(); +} + +/** + * Get the size of valid data in the file. This will not match the + * size that is returned from the filesystem because we use mmap + * to extend file by map_size every time. + */ +uint64_t PosixMmapFile::GetFileSize(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + size_t used = dst_ - base_; + return file_offset_ + used; +} + +IOStatus PosixMmapFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return IOStatus::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return IOStatus::OK(); + } + return IOError("While fadvise NotNeeded mmapped file", filename_, errno); +#endif +} + +#ifdef ROCKSDB_FALLOCATE_PRESENT +IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len, + const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + TEST_KILL_RANDOM("PosixMmapFile::Allocate:0"); + int alloc_status = 0; + if (allow_fallocate_) { + alloc_status = + fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast<off_t>(offset), static_cast<off_t>(len)); + } + if (alloc_status == 0) { + return IOStatus::OK(); + } else { + return IOError("While fallocate offset " + std::to_string(offset) + + " len " + std::to_string(len), + filename_, errno); + } +} +#endif + +/* + * PosixWritableFile + * + * Use posix write to write data to a file. + */ +PosixWritableFile::PosixWritableFile(const std::string& fname, int fd, + size_t logical_block_size, + const EnvOptions& options) + : FSWritableFile(options), + filename_(fname), + use_direct_io_(options.use_direct_writes), + fd_(fd), + filesize_(0), + logical_sector_size_(logical_block_size) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + allow_fallocate_ = options.allow_fallocate; + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#endif +#ifdef ROCKSDB_RANGESYNC_PRESENT + sync_file_range_supported_ = IsSyncFileRangeSupported(fd_); +#endif // ROCKSDB_RANGESYNC_PRESENT + assert(!options.use_mmap_writes); +} + +PosixWritableFile::~PosixWritableFile() { + if (fd_ >= 0) { + IOStatus s = PosixWritableFile::Close(IOOptions(), nullptr); + s.PermitUncheckedError(); + } +} + +IOStatus PosixWritableFile::Append(const Slice& data, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + if (use_direct_io()) { + assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment())); + } + const char* src = data.data(); + size_t nbytes = data.size(); + + if (!PosixWrite(fd_, src, nbytes)) { + return IOError("While appending to file", filename_, errno); + } + + filesize_ += nbytes; + return IOStatus::OK(); +} + +IOStatus PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + if (use_direct_io()) { + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment())); + } + assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + const char* src = data.data(); + size_t nbytes = data.size(); + if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) { + return IOError("While pwrite to file at offset " + std::to_string(offset), + filename_, errno); + } + filesize_ = offset + nbytes; + return IOStatus::OK(); +} + +IOStatus PosixWritableFile::Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + IOStatus s; + int r = ftruncate(fd_, size); + if (r < 0) { + s = IOError("While ftruncate file to size " + std::to_string(size), + filename_, errno); + } else { + filesize_ = size; + } + return s; +} + +IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + IOStatus s; + + size_t block_size; + size_t last_allocated_block; + GetPreallocationStatus(&block_size, &last_allocated_block); + TEST_SYNC_POINT_CALLBACK("PosixWritableFile::Close", &last_allocated_block); + if (last_allocated_block > 0) { + // trim the extra space preallocated at the end of the file + // NOTE(ljin): we probably don't want to surface failure as an IOError, + // but it will be nice to log these errors. + int dummy __attribute__((__unused__)); + dummy = ftruncate(fd_, filesize_); +#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) + // in some file systems, ftruncate only trims trailing space if the + // new file size is smaller than the current size. Calling fallocate + // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused + // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following + // filesystems: + // XFS (since Linux 2.6.38) + // ext4 (since Linux 3.0) + // Btrfs (since Linux 3.7) + // tmpfs (since Linux 3.5) + // We ignore error since failure of this operation does not affect + // correctness. + struct stat file_stats; + int result = fstat(fd_, &file_stats); + // After ftruncate, we check whether ftruncate has the correct behavior. + // If not, we should hack it with FALLOC_FL_PUNCH_HOLE + if (result == 0 && + (file_stats.st_size + file_stats.st_blksize - 1) / + file_stats.st_blksize != + file_stats.st_blocks / (file_stats.st_blksize / 512)) { + IOSTATS_TIMER_GUARD(allocate_nanos); + if (allow_fallocate_) { + fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_, + block_size * last_allocated_block - filesize_); + } + } +#endif + } + + if (close(fd_) < 0) { + s = IOError("While closing file after writing", filename_, errno); + } + fd_ = -1; + return s; +} + +// write out the cached data to the OS cache +IOStatus PosixWritableFile::Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + return IOStatus::OK(); +} + +IOStatus PosixWritableFile::Sync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("while fcntl(F_FULLFSYNC)", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync", filename_, errno); + } +#endif // HAVE_FULLFSYNC + return IOStatus::OK(); +} + +IOStatus PosixWritableFile::Fsync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("while fcntl(F_FULLFSYNC)", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fsync(fd_) < 0) { + return IOError("While fsync", filename_, errno); + } +#endif // HAVE_FULLFSYNC + return IOStatus::OK(); +} + +bool PosixWritableFile::IsSyncThreadSafe() const { return true; } + +uint64_t PosixWritableFile::GetFileSize(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + return filesize_; +} + +void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) { +#ifdef OS_LINUX +// Suppress Valgrind "Unimplemented functionality" error. +#ifndef ROCKSDB_VALGRIND_RUN + if (hint == write_hint_) { + return; + } + if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) { + write_hint_ = hint; + } +#else + (void)hint; +#endif // ROCKSDB_VALGRIND_RUN +#else + (void)hint; +#endif // OS_LINUX +} + +IOStatus PosixWritableFile::InvalidateCache(size_t offset, size_t length) { + if (use_direct_io()) { + return IOStatus::OK(); + } +#ifndef OS_LINUX + (void)offset; + (void)length; + return IOStatus::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return IOStatus::OK(); + } + return IOError("While fadvise NotNeeded", filename_, errno); +#endif +} + +#ifdef ROCKSDB_FALLOCATE_PRESENT +IOStatus PosixWritableFile::Allocate(uint64_t offset, uint64_t len, + const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + TEST_KILL_RANDOM("PosixWritableFile::Allocate:0"); + IOSTATS_TIMER_GUARD(allocate_nanos); + int alloc_status = 0; + if (allow_fallocate_) { + alloc_status = + fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast<off_t>(offset), static_cast<off_t>(len)); + } + if (alloc_status == 0) { + return IOStatus::OK(); + } else { + return IOError("While fallocate offset " + std::to_string(offset) + + " len " + std::to_string(len), + filename_, errno); + } +} +#endif + +IOStatus PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes, + const IOOptions& opts, + IODebugContext* dbg) { +#ifdef ROCKSDB_RANGESYNC_PRESENT + assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max())); + if (sync_file_range_supported_) { + int ret; + if (strict_bytes_per_sync_) { + // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length + // that spans all bytes written so far tells `sync_file_range` to wait for + // any outstanding writeback requests to finish before issuing a new one. + ret = + sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes), + SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE); + } else { + ret = sync_file_range(fd_, static_cast<off_t>(offset), + static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE); + } + if (ret != 0) { + return IOError("While sync_file_range returned " + std::to_string(ret), + filename_, errno); + } + return IOStatus::OK(); + } +#endif // ROCKSDB_RANGESYNC_PRESENT + return FSWritableFile::RangeSync(offset, nbytes, opts, dbg); +} + +#ifdef OS_LINUX +size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const { + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); +} +#endif + +/* + * PosixRandomRWFile + */ + +PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& /*options*/) + : filename_(fname), fd_(fd) {} + +PosixRandomRWFile::~PosixRandomRWFile() { + if (fd_ >= 0) { + IOStatus s = Close(IOOptions(), nullptr); + s.PermitUncheckedError(); + } +} + +IOStatus PosixRandomRWFile::Write(uint64_t offset, const Slice& data, + const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + const char* src = data.data(); + size_t nbytes = data.size(); + if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) { + return IOError("While write random read/write file at offset " + + std::to_string(offset), + filename_, errno); + } + + return IOStatus::OK(); +} + +IOStatus PosixRandomRWFile::Read(uint64_t offset, size_t n, + const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) const { + size_t left = n; + char* ptr = scratch; + while (left > 0) { + ssize_t done = pread(fd_, ptr, left, offset); + if (done < 0) { + // error while reading from file + if (errno == EINTR) { + // read was interrupted, try again. + continue; + } + return IOError("While reading random read/write file offset " + + std::to_string(offset) + " len " + std::to_string(n), + filename_, errno); + } else if (done == 0) { + // Nothing more to read + break; + } + + // Read `done` bytes + ptr += done; + offset += done; + left -= done; + } + + *result = Slice(scratch, n - left); + return IOStatus::OK(); +} + +IOStatus PosixRandomRWFile::Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + return IOStatus::OK(); +} + +IOStatus PosixRandomRWFile::Sync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("while fcntl(F_FULLFSYNC) random rw file", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync random read/write file", filename_, errno); + } +#endif // HAVE_FULLFSYNC + return IOStatus::OK(); +} + +IOStatus PosixRandomRWFile::Fsync(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { +#ifdef HAVE_FULLFSYNC + if (::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("While fcntl(F_FULLSYNC) random rw file", filename_, errno); + } +#else // HAVE_FULLFSYNC + if (fsync(fd_) < 0) { + return IOError("While fsync random read/write file", filename_, errno); + } +#endif // HAVE_FULLFSYNC + return IOStatus::OK(); +} + +IOStatus PosixRandomRWFile::Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + if (close(fd_) < 0) { + return IOError("While close random read/write file", filename_, errno); + } + fd_ = -1; + return IOStatus::OK(); +} + +PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { + // TODO should have error handling though not much we can do... + munmap(this->base_, length_); +} + +/* + * PosixDirectory + */ +#if !defined(BTRFS_SUPER_MAGIC) +// The magic number for BTRFS is fixed, if it's not defined, define it here +#define BTRFS_SUPER_MAGIC 0x9123683E +#endif +PosixDirectory::PosixDirectory(int fd, const std::string& directory_name) + : fd_(fd), directory_name_(directory_name) { + is_btrfs_ = false; +#ifdef OS_LINUX + struct statfs buf; + int ret = fstatfs(fd, &buf); + is_btrfs_ = (ret == 0 && buf.f_type == static_cast<decltype(buf.f_type)>( + BTRFS_SUPER_MAGIC)); +#endif +} + +PosixDirectory::~PosixDirectory() { + if (fd_ >= 0) { + IOStatus s = PosixDirectory::Close(IOOptions(), nullptr); + s.PermitUncheckedError(); + } +} + +IOStatus PosixDirectory::Fsync(const IOOptions& opts, IODebugContext* dbg) { + return FsyncWithDirOptions(opts, dbg, DirFsyncOptions()); +} + +// Users who want the file entries synced in Directory project must call a +// Fsync or FsyncWithDirOptions function before Close +IOStatus PosixDirectory::Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) { + IOStatus s = IOStatus::OK(); + if (close(fd_) < 0) { + s = IOError("While closing directory ", directory_name_, errno); + } else { + fd_ = -1; + } + return s; +} + +IOStatus PosixDirectory::FsyncWithDirOptions( + const IOOptions& /*opts*/, IODebugContext* /*dbg*/, + const DirFsyncOptions& dir_fsync_options) { + assert(fd_ >= 0); // Check use after close + IOStatus s = IOStatus::OK(); +#ifndef OS_AIX + if (is_btrfs_) { + // skip dir fsync for new file creation, which is not needed for btrfs + if (dir_fsync_options.reason == DirFsyncOptions::kNewFileSynced) { + return s; + } + // skip dir fsync for renaming file, only need to sync new file + if (dir_fsync_options.reason == DirFsyncOptions::kFileRenamed) { + std::string new_name = dir_fsync_options.renamed_new_name; + assert(!new_name.empty()); + int fd; + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(new_name.c_str(), O_RDONLY); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + s = IOError("While open renaming file", new_name, errno); + } else if (fsync(fd) < 0) { + s = IOError("While fsync renaming file", new_name, errno); + } + if (close(fd) < 0) { + s = IOError("While closing file after fsync", new_name, errno); + } + return s; + } + // fallback to dir-fsync for kDefault, kDirRenamed and kFileDeleted + } + + // skip fsync/fcntl when fd_ == -1 since this file descriptor has been closed + // in either the de-construction or the close function, data must have been + // fsync-ed before de-construction and close is called +#ifdef HAVE_FULLFSYNC + // btrfs is a Linux file system, while currently F_FULLFSYNC is available on + // Mac OS. + assert(!is_btrfs_); + if (fd_ != -1 && ::fcntl(fd_, F_FULLFSYNC) < 0) { + return IOError("while fcntl(F_FULLFSYNC)", "a directory", errno); + } +#else // HAVE_FULLFSYNC + if (fd_ != -1 && fsync(fd_) == -1) { + s = IOError("While fsync", "a directory", errno); + } +#endif // HAVE_FULLFSYNC +#endif // OS_AIX + return s; +} +} // namespace ROCKSDB_NAMESPACE +#endif |