diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/port | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/port')
27 files changed, 5916 insertions, 0 deletions
diff --git a/src/rocksdb/port/README b/src/rocksdb/port/README new file mode 100644 index 00000000..422563e2 --- /dev/null +++ b/src/rocksdb/port/README @@ -0,0 +1,10 @@ +This directory contains interfaces and implementations that isolate the +rest of the package from platform details. + +Code in the rest of the package includes "port.h" from this directory. +"port.h" in turn includes a platform specific "port_<platform>.h" file +that provides the platform specific implementation. + +See port_posix.h for an example of what must be provided in a platform +specific header file. + diff --git a/src/rocksdb/port/jemalloc_helper.h b/src/rocksdb/port/jemalloc_helper.h new file mode 100644 index 00000000..0c216fac --- /dev/null +++ b/src/rocksdb/port/jemalloc_helper.h @@ -0,0 +1,53 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifdef ROCKSDB_JEMALLOC +#ifdef __FreeBSD__ +#include <malloc_np.h> +#else +#include <jemalloc/jemalloc.h> +#endif + +#ifndef JEMALLOC_CXX_THROW +#define JEMALLOC_CXX_THROW +#endif + +// Declare non-standard jemalloc APIs as weak symbols. We can null-check these +// symbols to detect whether jemalloc is linked with the binary. +extern "C" void* mallocx(size_t, int) __attribute__((__weak__)); +extern "C" void* rallocx(void*, size_t, int) __attribute__((__weak__)); +extern "C" size_t xallocx(void*, size_t, size_t, int) __attribute__((__weak__)); +extern "C" size_t sallocx(const void*, int) __attribute__((__weak__)); +extern "C" void dallocx(void*, int) __attribute__((__weak__)); +extern "C" void sdallocx(void*, size_t, int) __attribute__((__weak__)); +extern "C" size_t nallocx(size_t, int) __attribute__((__weak__)); +extern "C" int mallctl(const char*, void*, size_t*, void*, size_t) + __attribute__((__weak__)); +extern "C" int mallctlnametomib(const char*, size_t*, size_t*) + __attribute__((__weak__)); +extern "C" int mallctlbymib(const size_t*, size_t, void*, size_t*, void*, + size_t) __attribute__((__weak__)); +extern "C" void malloc_stats_print(void (*)(void*, const char*), void*, + const char*) __attribute__((__weak__)); +extern "C" size_t malloc_usable_size(JEMALLOC_USABLE_SIZE_CONST void*) + JEMALLOC_CXX_THROW __attribute__((__weak__)); + +// Check if Jemalloc is linked with the binary. Note the main program might be +// using a different memory allocator even this method return true. +// It is loosely based on folly::usingJEMalloc(), minus the check that actually +// allocate memory and see if it is through jemalloc, to handle the dlopen() +// case: +// https://github.com/facebook/folly/blob/76cf8b5841fb33137cfbf8b224f0226437c855bc/folly/memory/Malloc.h#L147 +static inline bool HasJemalloc() { + return mallocx != nullptr && rallocx != nullptr && xallocx != nullptr && + sallocx != nullptr && dallocx != nullptr && sdallocx != nullptr && + nallocx != nullptr && mallctl != nullptr && + mallctlnametomib != nullptr && mallctlbymib != nullptr && + malloc_stats_print != nullptr && malloc_usable_size != nullptr; +} + +#endif // ROCKSDB_JEMALLOC diff --git a/src/rocksdb/port/likely.h b/src/rocksdb/port/likely.h new file mode 100644 index 00000000..397d7571 --- /dev/null +++ b/src/rocksdb/port/likely.h @@ -0,0 +1,18 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif diff --git a/src/rocksdb/port/port.h b/src/rocksdb/port/port.h new file mode 100644 index 00000000..13aa56d4 --- /dev/null +++ b/src/rocksdb/port/port.h @@ -0,0 +1,21 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include <string> + +// Include the appropriate platform specific file below. If you are +// porting to a new platform, see "port_example.h" for documentation +// of what the new port_<platform>.h file must provide. +#if defined(ROCKSDB_PLATFORM_POSIX) +#include "port/port_posix.h" +#elif defined(OS_WIN) +#include "port/win/port_win.h" +#endif diff --git a/src/rocksdb/port/port_dirent.h b/src/rocksdb/port/port_dirent.h new file mode 100644 index 00000000..cb1adbe1 --- /dev/null +++ b/src/rocksdb/port/port_dirent.h @@ -0,0 +1,44 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// See port_example.h for documentation for the following types/functions. + +#pragma once + +#ifdef ROCKSDB_PLATFORM_POSIX +#include <dirent.h> +#include <sys/types.h> +#elif defined(OS_WIN) + +namespace rocksdb { +namespace port { + +struct dirent { + char d_name[_MAX_PATH]; /* filename */ +}; + +struct DIR; + +DIR* opendir(const char* name); + +dirent* readdir(DIR* dirp); + +int closedir(DIR* dirp); + +} // namespace port + +using port::dirent; +using port::DIR; +using port::opendir; +using port::readdir; +using port::closedir; + +} // namespace rocksdb + +#endif // OS_WIN diff --git a/src/rocksdb/port/port_example.h b/src/rocksdb/port/port_example.h new file mode 100644 index 00000000..a94dc93c --- /dev/null +++ b/src/rocksdb/port/port_example.h @@ -0,0 +1,101 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// This file contains the specification, but not the implementations, +// of the types/operations/etc. that should be defined by a platform +// specific port_<platform>.h file. Use this file as a reference for +// how to port this package to a new platform. + +#pragma once + +namespace rocksdb { +namespace port { + +// TODO(jorlow): Many of these belong more in the environment class rather than +// here. We should try moving them and see if it affects perf. + +// The following boolean constant must be true on a little-endian machine +// and false otherwise. +static const bool kLittleEndian = true /* or some other expression */; + +// ------------------ Threading ------------------- + +// A Mutex represents an exclusive lock. +class Mutex { + public: + Mutex(); + ~Mutex(); + + // Lock the mutex. Waits until other lockers have exited. + // Will deadlock if the mutex is already locked by this thread. + void Lock(); + + // Unlock the mutex. + // REQUIRES: This mutex was locked by this thread. + void Unlock(); + + // Optionally crash if this thread does not hold this mutex. + // The implementation must be fast, especially if NDEBUG is + // defined. The implementation is allowed to skip all checks. + void AssertHeld(); +}; + +class CondVar { + public: + explicit CondVar(Mutex* mu); + ~CondVar(); + + // Atomically release *mu and block on this condition variable until + // either a call to SignalAll(), or a call to Signal() that picks + // this thread to wakeup. + // REQUIRES: this thread holds *mu + void Wait(); + + // If there are some threads waiting, wake up at least one of them. + void Signal(); + + // Wake up all waiting threads. + void SignallAll(); +}; + +// Thread-safe initialization. +// Used as follows: +// static port::OnceType init_control = LEVELDB_ONCE_INIT; +// static void Initializer() { ... do something ...; } +// ... +// port::InitOnce(&init_control, &Initializer); +typedef intptr_t OnceType; +#define LEVELDB_ONCE_INIT 0 +extern void InitOnce(port::OnceType*, void (*initializer)()); + +// ------------------ Compression ------------------- + +// Store the snappy compression of "input[0,input_length-1]" in *output. +// Returns false if snappy is not supported by this port. +extern bool Snappy_Compress(const char* input, size_t input_length, + std::string* output); + +// If input[0,input_length-1] looks like a valid snappy compressed +// buffer, store the size of the uncompressed data in *result and +// return true. Else return false. +extern bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result); + +// Attempt to snappy uncompress input[0,input_length-1] into *output. +// Returns true if successful, false if the input is invalid lightweight +// compressed data. +// +// REQUIRES: at least the first "n" bytes of output[] must be writable +// where "n" is the result of a successful call to +// Snappy_GetUncompressedLength. +extern bool Snappy_Uncompress(const char* input_data, size_t input_length, + char* output); + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/port_posix.cc b/src/rocksdb/port/port_posix.cc new file mode 100644 index 00000000..80081e48 --- /dev/null +++ b/src/rocksdb/port/port_posix.cc @@ -0,0 +1,221 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/port_posix.h" + +#include <assert.h> +#if defined(__i386__) || defined(__x86_64__) +#include <cpuid.h> +#endif +#include <errno.h> +#include <sched.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <sys/time.h> +#include <sys/resource.h> +#include <unistd.h> +#include <cstdlib> +#include "util/logging.h" + +namespace rocksdb { + +// We want to give users opportunity to default all the mutexes to adaptive if +// not specified otherwise. This enables a quick way to conduct various +// performance related experiements. +// +// NB! Support for adaptive mutexes is turned on by definining +// ROCKSDB_PTHREAD_ADAPTIVE_MUTEX during the compilation. If you use RocksDB +// build environment then this happens automatically; otherwise it's up to the +// consumer to define the identifier. +#ifdef ROCKSDB_DEFAULT_TO_ADAPTIVE_MUTEX +extern const bool kDefaultToAdaptiveMutex = true; +#else +extern const bool kDefaultToAdaptiveMutex = false; +#endif + +namespace port { + +static int PthreadCall(const char* label, int result) { + if (result != 0 && result != ETIMEDOUT) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); + abort(); + } + return result; +} + +Mutex::Mutex(bool adaptive) { + (void) adaptive; +#ifdef ROCKSDB_PTHREAD_ADAPTIVE_MUTEX + if (!adaptive) { + PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); + } else { + pthread_mutexattr_t mutex_attr; + PthreadCall("init mutex attr", pthread_mutexattr_init(&mutex_attr)); + PthreadCall("set mutex attr", + pthread_mutexattr_settype(&mutex_attr, + PTHREAD_MUTEX_ADAPTIVE_NP)); + PthreadCall("init mutex", pthread_mutex_init(&mu_, &mutex_attr)); + PthreadCall("destroy mutex attr", + pthread_mutexattr_destroy(&mutex_attr)); + } +#else + PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); +#endif // ROCKSDB_PTHREAD_ADAPTIVE_MUTEX +} + +Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } + +void Mutex::Lock() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); +#ifndef NDEBUG + locked_ = true; +#endif +} + +void Mutex::Unlock() { +#ifndef NDEBUG + locked_ = false; +#endif + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void Mutex::AssertHeld() { +#ifndef NDEBUG + assert(locked_); +#endif +} + +CondVar::CondVar(Mutex* mu) + : mu_(mu) { + PthreadCall("init cv", pthread_cond_init(&cv_, nullptr)); +} + +CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } + +void CondVar::Wait() { +#ifndef NDEBUG + mu_->locked_ = false; +#endif + PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); +#ifndef NDEBUG + mu_->locked_ = true; +#endif +} + +bool CondVar::TimedWait(uint64_t abs_time_us) { + struct timespec ts; + ts.tv_sec = static_cast<time_t>(abs_time_us / 1000000); + ts.tv_nsec = static_cast<suseconds_t>((abs_time_us % 1000000) * 1000); + +#ifndef NDEBUG + mu_->locked_ = false; +#endif + int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts); +#ifndef NDEBUG + mu_->locked_ = true; +#endif + if (err == ETIMEDOUT) { + return true; + } + if (err != 0) { + PthreadCall("timedwait", err); + } + return false; +} + +void CondVar::Signal() { + PthreadCall("signal", pthread_cond_signal(&cv_)); +} + +void CondVar::SignalAll() { + PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); +} + +RWMutex::RWMutex() { + PthreadCall("init mutex", pthread_rwlock_init(&mu_, nullptr)); +} + +RWMutex::~RWMutex() { PthreadCall("destroy mutex", pthread_rwlock_destroy(&mu_)); } + +void RWMutex::ReadLock() { PthreadCall("read lock", pthread_rwlock_rdlock(&mu_)); } + +void RWMutex::WriteLock() { PthreadCall("write lock", pthread_rwlock_wrlock(&mu_)); } + +void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&mu_)); } + +void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); } + +int PhysicalCoreID() { +#if defined(ROCKSDB_SCHED_GETCPU_PRESENT) && defined(__x86_64__) && \ + (__GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 22)) + // sched_getcpu uses VDSO getcpu() syscall since 2.22. I believe Linux offers VDSO + // support only on x86_64. This is the fastest/preferred method if available. + int cpuno = sched_getcpu(); + if (cpuno < 0) { + return -1; + } + return cpuno; +#elif defined(__x86_64__) || defined(__i386__) + // clang/gcc both provide cpuid.h, which defines __get_cpuid(), for x86_64 and i386. + unsigned eax, ebx = 0, ecx, edx; + if (!__get_cpuid(1, &eax, &ebx, &ecx, &edx)) { + return -1; + } + return ebx >> 24; +#else + // give up, the caller can generate a random number or something. + return -1; +#endif +} + +void InitOnce(OnceType* once, void (*initializer)()) { + PthreadCall("once", pthread_once(once, initializer)); +} + +void Crash(const std::string& srcfile, int srcline) { + fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline); + fflush(stdout); + kill(getpid(), SIGTERM); +} + +int GetMaxOpenFiles() { +#if defined(RLIMIT_NOFILE) + struct rlimit no_files_limit; + if (getrlimit(RLIMIT_NOFILE, &no_files_limit) != 0) { + return -1; + } + // protect against overflow + if (no_files_limit.rlim_cur >= std::numeric_limits<int>::max()) { + return std::numeric_limits<int>::max(); + } + return static_cast<int>(no_files_limit.rlim_cur); +#endif + return -1; +} + +void *cacheline_aligned_alloc(size_t size) { +#if __GNUC__ < 5 && defined(__SANITIZE_ADDRESS__) + return malloc(size); +#elif ( _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600 || defined(__APPLE__)) + void *m; + errno = posix_memalign(&m, CACHE_LINE_SIZE, size); + return errno ? nullptr : m; +#else + return malloc(size); +#endif +} + +void cacheline_aligned_free(void *memblock) { + free(memblock); +} + + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/port_posix.h b/src/rocksdb/port/port_posix.h new file mode 100644 index 00000000..63d7239f --- /dev/null +++ b/src/rocksdb/port/port_posix.h @@ -0,0 +1,204 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// See port_example.h for documentation for the following types/functions. + +#pragma once + +#include <thread> +// size_t printf formatting named in the manner of C99 standard formatting +// strings such as PRIu64 +// in fact, we could use that one +#define ROCKSDB_PRIszt "zu" + +#define __declspec(S) + +#define ROCKSDB_NOEXCEPT noexcept + +#undef PLATFORM_IS_LITTLE_ENDIAN +#if defined(OS_MACOSX) + #include <machine/endian.h> + #if defined(__DARWIN_LITTLE_ENDIAN) && defined(__DARWIN_BYTE_ORDER) + #define PLATFORM_IS_LITTLE_ENDIAN \ + (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN) + #endif +#elif defined(OS_SOLARIS) + #include <sys/isa_defs.h> + #ifdef _LITTLE_ENDIAN + #define PLATFORM_IS_LITTLE_ENDIAN true + #else + #define PLATFORM_IS_LITTLE_ENDIAN false + #endif + #include <alloca.h> +#elif defined(OS_AIX) + #include <sys/types.h> + #include <arpa/nameser_compat.h> + #define PLATFORM_IS_LITTLE_ENDIAN (BYTE_ORDER == LITTLE_ENDIAN) + #include <alloca.h> +#elif defined(OS_FREEBSD) || defined(OS_OPENBSD) || defined(OS_NETBSD) || \ + defined(OS_DRAGONFLYBSD) || defined(OS_ANDROID) + #include <sys/endian.h> + #include <sys/types.h> + #define PLATFORM_IS_LITTLE_ENDIAN (_BYTE_ORDER == _LITTLE_ENDIAN) +#else + #include <endian.h> +#endif +#include <pthread.h> + +#include <stdint.h> +#include <string.h> +#include <limits> +#include <string> + +#ifndef PLATFORM_IS_LITTLE_ENDIAN +#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) +#endif + +#if defined(OS_MACOSX) || defined(OS_SOLARIS) || defined(OS_FREEBSD) ||\ + defined(OS_NETBSD) || defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) ||\ + defined(OS_ANDROID) || defined(CYGWIN) || defined(OS_AIX) +// Use fread/fwrite/fflush on platforms without _unlocked variants +#define fread_unlocked fread +#define fwrite_unlocked fwrite +#define fflush_unlocked fflush +#endif + +#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\ + defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) +// Use fsync() on platforms without fdatasync() +#define fdatasync fsync +#endif + +#if defined(OS_ANDROID) && __ANDROID_API__ < 9 +// fdatasync() was only introduced in API level 9 on Android. Use fsync() +// when targeting older platforms. +#define fdatasync fsync +#endif + +namespace rocksdb { + +extern const bool kDefaultToAdaptiveMutex; + +namespace port { + +// For use at db/file_indexer.h kLevelMaxIndex +const uint32_t kMaxUint32 = std::numeric_limits<uint32_t>::max(); +const int kMaxInt32 = std::numeric_limits<int32_t>::max(); +const int kMinInt32 = std::numeric_limits<int32_t>::min(); +const uint64_t kMaxUint64 = std::numeric_limits<uint64_t>::max(); +const int64_t kMaxInt64 = std::numeric_limits<int64_t>::max(); +const int64_t kMinInt64 = std::numeric_limits<int64_t>::min(); +const size_t kMaxSizet = std::numeric_limits<size_t>::max(); + +static const bool kLittleEndian = PLATFORM_IS_LITTLE_ENDIAN; +#undef PLATFORM_IS_LITTLE_ENDIAN + +class CondVar; + +class Mutex { + public: + explicit Mutex(bool adaptive = kDefaultToAdaptiveMutex); + ~Mutex(); + + void Lock(); + void Unlock(); + // this will assert if the mutex is not locked + // it does NOT verify that mutex is held by a calling thread + void AssertHeld(); + + private: + friend class CondVar; + pthread_mutex_t mu_; +#ifndef NDEBUG + bool locked_; +#endif + + // No copying + Mutex(const Mutex&); + void operator=(const Mutex&); +}; + +class RWMutex { + public: + RWMutex(); + ~RWMutex(); + + void ReadLock(); + void WriteLock(); + void ReadUnlock(); + void WriteUnlock(); + void AssertHeld() { } + + private: + pthread_rwlock_t mu_; // the underlying platform mutex + + // No copying allowed + RWMutex(const RWMutex&); + void operator=(const RWMutex&); +}; + +class CondVar { + public: + explicit CondVar(Mutex* mu); + ~CondVar(); + void Wait(); + // Timed condition wait. Returns true if timeout occurred. + bool TimedWait(uint64_t abs_time_us); + void Signal(); + void SignalAll(); + private: + pthread_cond_t cv_; + Mutex* mu_; +}; + +using Thread = std::thread; + +static inline void AsmVolatilePause() { +#if defined(__i386__) || defined(__x86_64__) + asm volatile("pause"); +#elif defined(__aarch64__) + asm volatile("wfe"); +#elif defined(__powerpc64__) + asm volatile("or 27,27,27"); +#endif + // it's okay for other platforms to be no-ops +} + +// Returns -1 if not available on this platform +extern int PhysicalCoreID(); + +typedef pthread_once_t OnceType; +#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT +extern void InitOnce(OnceType* once, void (*initializer)()); + +#ifndef CACHE_LINE_SIZE + #if defined(__s390__) + #define CACHE_LINE_SIZE 256U + #elif defined(__powerpc__) || defined(__aarch64__) + #define CACHE_LINE_SIZE 128U + #else + #define CACHE_LINE_SIZE 64U + #endif +#endif + + +extern void *cacheline_aligned_alloc(size_t size); + +extern void cacheline_aligned_free(void *memblock); + +#define ALIGN_AS(n) alignas(n) + +#define PREFETCH(addr, rw, locality) __builtin_prefetch(addr, rw, locality) + +extern void Crash(const std::string& srcfile, int srcline); + +extern int GetMaxOpenFiles(); + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/stack_trace.cc b/src/rocksdb/port/stack_trace.cc new file mode 100644 index 00000000..8f8135a4 --- /dev/null +++ b/src/rocksdb/port/stack_trace.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "port/stack_trace.h" + +#if defined(ROCKSDB_LITE) || !(defined(ROCKSDB_BACKTRACE) || defined(OS_MACOSX)) || \ + defined(CYGWIN) || defined(OS_FREEBSD) || defined(OS_SOLARIS) + +// noop + +namespace rocksdb { +namespace port { +void InstallStackTraceHandler() {} +void PrintStack(int /*first_frames_to_skip*/) {} +} // namespace port +} // namespace rocksdb + +#else + +#include <execinfo.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <cxxabi.h> + +namespace rocksdb { +namespace port { + +namespace { + +#if defined(OS_LINUX) || defined(OS_FREEBSD) +const char* GetExecutableName() { + static char name[1024]; + + char link[1024]; + snprintf(link, sizeof(link), "/proc/%d/exe", getpid()); + auto read = readlink(link, name, sizeof(name) - 1); + if (-1 == read) { + return nullptr; + } else { + name[read] = 0; + return name; + } +} + +void PrintStackTraceLine(const char* symbol, void* frame) { + static const char* executable = GetExecutableName(); + if (symbol) { + fprintf(stderr, "%s ", symbol); + } + if (executable) { + // out source to addr2line, for the address translation + const int kLineMax = 256; + char cmd[kLineMax]; + snprintf(cmd, kLineMax, "addr2line %p -e %s -f -C 2>&1", frame, executable); + auto f = popen(cmd, "r"); + if (f) { + char line[kLineMax]; + while (fgets(line, sizeof(line), f)) { + line[strlen(line) - 1] = 0; // remove newline + fprintf(stderr, "%s\t", line); + } + pclose(f); + } + } else { + fprintf(stderr, " %p", frame); + } + + fprintf(stderr, "\n"); +} +#elif defined(OS_MACOSX) + +void PrintStackTraceLine(const char* symbol, void* frame) { + static int pid = getpid(); + // out source to atos, for the address translation + const int kLineMax = 256; + char cmd[kLineMax]; + snprintf(cmd, kLineMax, "xcrun atos %p -p %d 2>&1", frame, pid); + auto f = popen(cmd, "r"); + if (f) { + char line[kLineMax]; + while (fgets(line, sizeof(line), f)) { + line[strlen(line) - 1] = 0; // remove newline + fprintf(stderr, "%s\t", line); + } + pclose(f); + } else if (symbol) { + fprintf(stderr, "%s ", symbol); + } + + fprintf(stderr, "\n"); +} + +#endif + +} // namespace + +void PrintStack(int first_frames_to_skip) { + const int kMaxFrames = 100; + void* frames[kMaxFrames]; + + auto num_frames = backtrace(frames, kMaxFrames); + auto symbols = backtrace_symbols(frames, num_frames); + + for (int i = first_frames_to_skip; i < num_frames; ++i) { + fprintf(stderr, "#%-2d ", i - first_frames_to_skip); + PrintStackTraceLine((symbols != nullptr) ? symbols[i] : nullptr, frames[i]); + } + free(symbols); +} + +static void StackTraceHandler(int sig) { + // reset to default handler + signal(sig, SIG_DFL); + fprintf(stderr, "Received signal %d (%s)\n", sig, strsignal(sig)); + // skip the top three signal handler related frames + PrintStack(3); + // re-signal to default handler (so we still get core dump if needed...) + raise(sig); +} + +void InstallStackTraceHandler() { + // just use the plain old signal as it's simple and sufficient + // for this use case + signal(SIGILL, StackTraceHandler); + signal(SIGSEGV, StackTraceHandler); + signal(SIGBUS, StackTraceHandler); + signal(SIGABRT, StackTraceHandler); +} + +} // namespace port +} // namespace rocksdb + +#endif diff --git a/src/rocksdb/port/stack_trace.h b/src/rocksdb/port/stack_trace.h new file mode 100644 index 00000000..f1d4f1fe --- /dev/null +++ b/src/rocksdb/port/stack_trace.h @@ -0,0 +1,19 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +namespace rocksdb { +namespace port { + +// Install a signal handler to print callstack on the following signals: +// SIGILL SIGSEGV SIGBUS SIGABRT +// Currently supports linux only. No-op otherwise. +void InstallStackTraceHandler(); + +// Prints stack, skips skip_first_frames frames +void PrintStack(int first_frames_to_skip = 0); + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/sys_time.h b/src/rocksdb/port/sys_time.h new file mode 100644 index 00000000..2f83da8b --- /dev/null +++ b/src/rocksdb/port/sys_time.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// This file is a portable substitute for sys/time.h which does not exist on +// Windows + +#pragma once + +#if defined(OS_WIN) && defined(_MSC_VER) + +#include <time.h> + +namespace rocksdb { + +namespace port { + +// Avoid including winsock2.h for this definition +typedef struct timeval { + long tv_sec; + long tv_usec; +} timeval; + +void gettimeofday(struct timeval* tv, struct timezone* tz); + +inline struct tm* localtime_r(const time_t* timep, struct tm* result) { + errno_t ret = localtime_s(result, timep); + return (ret == 0) ? result : NULL; +} +} + +using port::timeval; +using port::gettimeofday; +using port::localtime_r; +} + +#else +#include <time.h> +#include <sys/time.h> +#endif diff --git a/src/rocksdb/port/util_logger.h b/src/rocksdb/port/util_logger.h new file mode 100644 index 00000000..ba424705 --- /dev/null +++ b/src/rocksdb/port/util_logger.h @@ -0,0 +1,20 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +// Include the appropriate platform specific file below. If you are +// porting to a new platform, see "port_example.h" for documentation +// of what the new port_<platform>.h file must provide. + +#if defined(ROCKSDB_PLATFORM_POSIX) +#include "env/posix_logger.h" +#elif defined(OS_WIN) +#include "port/win/win_logger.h" +#endif diff --git a/src/rocksdb/port/win/env_default.cc b/src/rocksdb/port/win/env_default.cc new file mode 100644 index 00000000..d24c2191 --- /dev/null +++ b/src/rocksdb/port/win/env_default.cc @@ -0,0 +1,41 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include <mutex> + +#include <rocksdb/env.h> +#include "port/win/env_win.h" +#include "util/compression_context_cache.h" +#include "util/sync_point.h" +#include "util/thread_local.h" + +namespace rocksdb { +namespace port { + +// We choose not to destroy the env because joining the threads from the +// system loader +// which destroys the statics (same as from DLLMain) creates a system loader +// dead-lock. +// in this manner any remaining threads are terminated OK. +namespace { + std::once_flag winenv_once_flag; + Env* envptr; +}; +} + +Env* Env::Default() { + using namespace port; + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); + INIT_SYNC_POINT_SINGLETONS(); + std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); + return envptr; +} + +} diff --git a/src/rocksdb/port/win/env_win.cc b/src/rocksdb/port/win/env_win.cc new file mode 100644 index 00000000..9abb14d6 --- /dev/null +++ b/src/rocksdb/port/win/env_win.cc @@ -0,0 +1,1523 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/env_win.h" +#include "port/win/win_thread.h" +#include <algorithm> +#include <ctime> +#include <thread> + +#include <errno.h> +#include <process.h> // _getpid +#include <io.h> // _access +#include <direct.h> // _rmdir, _mkdir, _getcwd +#include <sys/types.h> +#include <sys/stat.h> + +#include "rocksdb/env.h" +#include "rocksdb/slice.h" + +#include "port/port.h" +#include "port/port_dirent.h" +#include "port/win/win_logger.h" +#include "port/win/io_win.h" + +#include "monitoring/iostats_context_imp.h" + +#include "monitoring/thread_status_updater.h" +#include "monitoring/thread_status_util.h" + +#include <rpc.h> // for uuid generation +#include <windows.h> +#include <shlwapi.h> +#include "strsafe.h" + +#include <algorithm> + +namespace rocksdb { + +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + +namespace { + +// Sector size used when physical sector size cannot be obtained from device. +static const size_t kSectorSize = 512; + +// RAII helpers for HANDLEs +const auto CloseHandleFunc = [](HANDLE h) { ::CloseHandle(h); }; +typedef std::unique_ptr<void, decltype(CloseHandleFunc)> UniqueCloseHandlePtr; + +const auto FindCloseFunc = [](HANDLE h) { ::FindClose(h); }; +typedef std::unique_ptr<void, decltype(FindCloseFunc)> UniqueFindClosePtr; + +void WinthreadCall(const char* label, std::error_code result) { + if (0 != result.value()) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result.value())); + abort(); + } +} + +} + +namespace port { + +WinEnvIO::WinEnvIO(Env* hosted_env) + : hosted_env_(hosted_env), + page_size_(4 * 1024), + allocation_granularity_(page_size_), + perf_counter_frequency_(0), + nano_seconds_per_period_(0), + GetSystemTimePreciseAsFileTime_(NULL) { + + SYSTEM_INFO sinfo; + GetSystemInfo(&sinfo); + + page_size_ = sinfo.dwPageSize; + allocation_granularity_ = sinfo.dwAllocationGranularity; + + { + LARGE_INTEGER qpf; + BOOL ret __attribute__((__unused__)); + ret = QueryPerformanceFrequency(&qpf); + assert(ret == TRUE); + perf_counter_frequency_ = qpf.QuadPart; + + if (std::nano::den % perf_counter_frequency_ == 0) { + nano_seconds_per_period_ = std::nano::den / perf_counter_frequency_; + } + } + + HMODULE module = GetModuleHandle("kernel32.dll"); + if (module != NULL) { + GetSystemTimePreciseAsFileTime_ = + (FnGetSystemTimePreciseAsFileTime)GetProcAddress( + module, "GetSystemTimePreciseAsFileTime"); + } +} + +WinEnvIO::~WinEnvIO() { +} + +Status WinEnvIO::DeleteFile(const std::string& fname) { + Status result; + + BOOL ret = RX_DeleteFile(RX_FN(fname).c_str()); + + if(!ret) { + auto lastError = GetLastError(); + result = IOErrorFromWindowsError("Failed to delete: " + fname, + lastError); + } + + return result; +} + +Status WinEnvIO::Truncate(const std::string& fname, size_t size) { + Status s; + int result = rocksdb::port::Truncate(fname, size); + if (result != 0) { + s = IOError("Failed to truncate: " + fname, errno); + } + return s; +} + +Status WinEnvIO::GetCurrentTime(int64_t* unix_time) { + time_t time = std::time(nullptr); + if (time == (time_t)(-1)) { + return Status::NotSupported("Failed to get time"); + } + + *unix_time = time; + return Status::OK(); +} + +Status WinEnvIO::NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) { + Status s; + + result->reset(); + + // Corruption test needs to rename and delete files of these kind + // while they are still open with another handle. For that reason we + // allow share_write and delete(allows rename). + HANDLE hFile = INVALID_HANDLE_VALUE; + + DWORD fileFlags = FILE_ATTRIBUTE_READONLY; + + if (options.use_direct_reads && !options.use_mmap_reads) { + fileFlags |= FILE_FLAG_NO_BUFFERING; + } + + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile( + RX_FN(fname).c_str(), GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, + OPEN_EXISTING, // Original fopen mode is "rb" + fileFlags, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, + lastError); + } else { + result->reset(new WinSequentialFile(fname, hFile, options)); + } + return s; +} + +Status WinEnvIO::NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options) { + result->reset(); + Status s; + + // Open the file for read-only random access + // Random access is to disable read-ahead as the system reads too much data + DWORD fileFlags = FILE_ATTRIBUTE_READONLY; + + if (options.use_direct_reads && !options.use_mmap_reads) { + fileFlags |= FILE_FLAG_NO_BUFFERING; + } else { + fileFlags |= FILE_FLAG_RANDOM_ACCESS; + } + + /// Shared access is necessary for corruption test to pass + // almost all tests would work with a possible exception of fault_injection + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile( + RX_FN(fname).c_str(), GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, OPEN_EXISTING, fileFlags, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "NewRandomAccessFile failed to Create/Open: " + fname, lastError); + } + + UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); + + // CAUTION! This will map the entire file into the process address space + if (options.use_mmap_reads && sizeof(void*) >= 8) { + // Use mmap when virtual address-space is plentiful. + uint64_t fileSize; + + s = GetFileSize(fname, &fileSize); + + if (s.ok()) { + // Will not map empty files + if (fileSize == 0) { + return IOError( + "NewRandomAccessFile failed to map empty file: " + fname, EINVAL); + } + + HANDLE hMap = RX_CreateFileMapping(hFile, NULL, PAGE_READONLY, + 0, // At its present length + 0, + NULL); // Mapping name + + if (!hMap) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create file mapping for NewRandomAccessFile: " + fname, + lastError); + } + + UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc); + + const void* mapped_region = + MapViewOfFileEx(hMap, FILE_MAP_READ, + 0, // High DWORD of access start + 0, // Low DWORD + static_cast<SIZE_T>(fileSize), + NULL); // Let the OS choose the mapping + + if (!mapped_region) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to MapViewOfFile for NewRandomAccessFile: " + fname, + lastError); + } + + result->reset(new WinMmapReadableFile(fname, hFile, hMap, mapped_region, + static_cast<size_t>(fileSize))); + + mapGuard.release(); + fileGuard.release(); + } + } else { + result->reset(new WinRandomAccessFile(fname, hFile, + std::max(GetSectorSize(fname), + page_size_), + options)); + fileGuard.release(); + } + return s; +} + +Status WinEnvIO::OpenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options, + bool reopen) { + + const size_t c_BufferCapacity = 64 * 1024; + + EnvOptions local_options(options); + + result->reset(); + Status s; + + DWORD fileFlags = FILE_ATTRIBUTE_NORMAL; + + if (local_options.use_direct_writes && !local_options.use_mmap_writes) { + fileFlags = FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH; + } + + // Desired access. We are want to write only here but if we want to memory + // map + // the file then there is no write only mode so we have to create it + // Read/Write + // However, MapViewOfFile specifies only Write only + DWORD desired_access = GENERIC_WRITE; + DWORD shared_mode = FILE_SHARE_READ; + + if (local_options.use_mmap_writes) { + desired_access |= GENERIC_READ; + } else { + // Adding this solely for tests to pass (fault_injection_test, + // wal_manager_test). + shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); + } + + // This will always truncate the file + DWORD creation_disposition = CREATE_ALWAYS; + if (reopen) { + creation_disposition = OPEN_ALWAYS; + } + + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile( + RX_FN(fname).c_str(), + desired_access, // Access desired + shared_mode, + NULL, // Security attributes + // Posix env says (reopen) ? (O_CREATE | O_APPEND) : O_CREAT | O_TRUNC + creation_disposition, + fileFlags, // Flags + NULL); // Template File + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create a NewWriteableFile: " + fname, lastError); + } + + // We will start writing at the end, appending + if (reopen) { + LARGE_INTEGER zero_move; + zero_move.QuadPart = 0; + BOOL ret = SetFilePointerEx(hFile, zero_move, NULL, FILE_END); + if (!ret) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create a ReopenWritableFile move to the end: " + fname, + lastError); + } + } + + if (options.use_mmap_writes) { + // We usually do not use mmmapping on SSD and thus we pass memory + // page_size + result->reset(new WinMmapFile(fname, hFile, page_size_, + allocation_granularity_, local_options)); + } else { + // Here we want the buffer allocation to be aligned by the SSD page size + // and to be a multiple of it + result->reset(new WinWritableFile(fname, hFile, + std::max(GetSectorSize(fname), + GetPageSize()), + c_BufferCapacity, local_options)); + } + return s; +} + +Status WinEnvIO::NewRandomRWFile(const std::string & fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions & options) { + + Status s; + + // Open the file for read-only random access + // Random access is to disable read-ahead as the system reads too much data + DWORD desired_access = GENERIC_READ | GENERIC_WRITE; + DWORD shared_mode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; + DWORD creation_disposition = OPEN_EXISTING; // Fail if file does not exist + DWORD file_flags = FILE_FLAG_RANDOM_ACCESS; + + if (options.use_direct_reads && options.use_direct_writes) { + file_flags |= FILE_FLAG_NO_BUFFERING; + } + + /// Shared access is necessary for corruption test to pass + // almost all tests would work with a possible exception of fault_injection + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = + RX_CreateFile(RX_FN(fname).c_str(), + desired_access, + shared_mode, + NULL, // Security attributes + creation_disposition, + file_flags, + NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "NewRandomRWFile failed to Create/Open: " + fname, lastError); + } + + UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); + result->reset(new WinRandomRWFile(fname, hFile, + std::max(GetSectorSize(fname), + GetPageSize()), + options)); + fileGuard.release(); + + return s; +} + +Status WinEnvIO::NewMemoryMappedFileBuffer( + const std::string & fname, + std::unique_ptr<MemoryMappedFileBuffer>* result) { + Status s; + result->reset(); + + DWORD fileFlags = FILE_ATTRIBUTE_READONLY; + + HANDLE hFile = INVALID_HANDLE_VALUE; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile( + RX_FN(fname).c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + OPEN_EXISTING, // Open only if it exists + fileFlags, + NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to open NewMemoryMappedFileBuffer: " + fname, lastError); + return s; + } + UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); + + uint64_t fileSize = 0; + s = GetFileSize(fname, &fileSize); + if (!s.ok()) { + return s; + } + // Will not map empty files + if (fileSize == 0) { + return Status::NotSupported( + "NewMemoryMappedFileBuffer can not map zero length files: " + fname); + } + + // size_t is 32-bit with 32-bit builds + if (fileSize > std::numeric_limits<size_t>::max()) { + return Status::NotSupported( + "The specified file size does not fit into 32-bit memory addressing: " + + fname); + } + + HANDLE hMap = RX_CreateFileMapping(hFile, NULL, PAGE_READWRITE, + 0, // Whole file at its present length + 0, + NULL); // Mapping name + + if (!hMap) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create file mapping for: " + fname, lastError); + } + UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc); + + void* base = MapViewOfFileEx(hMap, FILE_MAP_WRITE, + 0, // High DWORD of access start + 0, // Low DWORD + static_cast<SIZE_T>(fileSize), + NULL); // Let the OS choose the mapping + + if (!base) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to MapViewOfFile for NewMemoryMappedFileBuffer: " + fname, + lastError); + } + + result->reset(new WinMemoryMappedBuffer(hFile, hMap, base, + static_cast<size_t>(fileSize))); + + mapGuard.release(); + fileGuard.release(); + + return s; +} + +Status WinEnvIO::NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) { + Status s; + // Must be nullptr on failure + result->reset(); + + if (!DirExists(name)) { + s = IOErrorFromWindowsError( + "open folder: " + name, ERROR_DIRECTORY); + return s; + } + + HANDLE handle = INVALID_HANDLE_VALUE; + // 0 - for access means read metadata + { + IOSTATS_TIMER_GUARD(open_nanos); + handle = RX_CreateFile( + RX_FN(name).c_str(), 0, + FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible + NULL); + } + + if (INVALID_HANDLE_VALUE == handle) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("open folder: " + name, lastError); + return s; + } + + result->reset(new WinDirectory(handle)); + + return s; +} + +Status WinEnvIO::FileExists(const std::string& fname) { + Status s; + // TODO: This does not follow symbolic links at this point + // which is consistent with _access() impl on windows + // but can be added + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (FALSE == RX_GetFileAttributesEx(RX_FN(fname).c_str(), + GetFileExInfoStandard, &attrs)) { + auto lastError = GetLastError(); + switch (lastError) { + case ERROR_ACCESS_DENIED: + case ERROR_NOT_FOUND: + case ERROR_FILE_NOT_FOUND: + case ERROR_PATH_NOT_FOUND: + s = Status::NotFound(); + break; + default: + s = IOErrorFromWindowsError("Unexpected error for: " + fname, + lastError); + break; + } + } + return s; +} + +Status WinEnvIO::GetChildren(const std::string& dir, + std::vector<std::string>* result) { + + Status status; + result->clear(); + std::vector<std::string> output; + + RX_WIN32_FIND_DATA data; + memset(&data, 0, sizeof(data)); + std::string pattern(dir); + pattern.append("\\").append("*"); + + HANDLE handle = RX_FindFirstFileEx(RX_FN(pattern).c_str(), + // Do not want alternative name + FindExInfoBasic, + &data, + FindExSearchNameMatch, + NULL, // lpSearchFilter + 0); + + if (handle == INVALID_HANDLE_VALUE) { + auto lastError = GetLastError(); + switch (lastError) { + case ERROR_NOT_FOUND: + case ERROR_ACCESS_DENIED: + case ERROR_FILE_NOT_FOUND: + case ERROR_PATH_NOT_FOUND: + status = Status::NotFound(); + break; + default: + status = IOErrorFromWindowsError( + "Failed to GetChhildren for: " + dir, lastError); + } + return status; + } + + UniqueFindClosePtr fc(handle, FindCloseFunc); + + if (result->capacity() > 0) { + output.reserve(result->capacity()); + } + + // For safety + data.cFileName[MAX_PATH - 1] = 0; + + while (true) { + auto x = RX_FILESTRING(data.cFileName, RX_FNLEN(data.cFileName)); + output.emplace_back(FN_TO_RX(x)); + BOOL ret =- RX_FindNextFile(handle, &data); + // If the function fails the return value is zero + // and non-zero otherwise. Not TRUE or FALSE. + if (ret == FALSE) { + // Posix does not care why we stopped + break; + } + data.cFileName[MAX_PATH - 1] = 0; + } + output.swap(*result); + return status; +} + +Status WinEnvIO::CreateDir(const std::string& name) { + Status result; + BOOL ret = RX_CreateDirectory(RX_FN(name).c_str(), NULL); + if (!ret) { + auto lastError = GetLastError(); + result = IOErrorFromWindowsError( + "Failed to create a directory: " + name, lastError); + } + + return result; +} + +Status WinEnvIO::CreateDirIfMissing(const std::string& name) { + Status result; + + if (DirExists(name)) { + return result; + } + + BOOL ret = RX_CreateDirectory(RX_FN(name).c_str(), NULL); + if (!ret) { + auto lastError = GetLastError(); + if (lastError != ERROR_ALREADY_EXISTS) { + result = IOErrorFromWindowsError( + "Failed to create a directory: " + name, lastError); + } else { + result = + Status::IOError(name + ": exists but is not a directory"); + } + } + return result; +} + +Status WinEnvIO::DeleteDir(const std::string& name) { + Status result; + BOOL ret = RX_RemoveDirectory(RX_FN(name).c_str()); + if (!ret) { + auto lastError = GetLastError(); + result = IOErrorFromWindowsError("Failed to remove dir: " + name, + lastError); + } + return result; +} + +Status WinEnvIO::GetFileSize(const std::string& fname, + uint64_t* size) { + Status s; + + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (RX_GetFileAttributesEx(RX_FN(fname).c_str(), GetFileExInfoStandard, + &attrs)) { + ULARGE_INTEGER file_size; + file_size.HighPart = attrs.nFileSizeHigh; + file_size.LowPart = attrs.nFileSizeLow; + *size = file_size.QuadPart; + } else { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Can not get size for: " + fname, lastError); + } + return s; +} + +uint64_t WinEnvIO::FileTimeToUnixTime(const FILETIME& ftTime) { + const uint64_t c_FileTimePerSecond = 10000000U; + // UNIX epoch starts on 1970-01-01T00:00:00Z + // Windows FILETIME starts on 1601-01-01T00:00:00Z + // Therefore, we need to subtract the below number of seconds from + // the seconds that we obtain from FILETIME with an obvious loss of + // precision + const uint64_t c_SecondBeforeUnixEpoch = 11644473600U; + + ULARGE_INTEGER li; + li.HighPart = ftTime.dwHighDateTime; + li.LowPart = ftTime.dwLowDateTime; + + uint64_t result = + (li.QuadPart / c_FileTimePerSecond) - c_SecondBeforeUnixEpoch; + return result; +} + +Status WinEnvIO::GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + Status s; + + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (RX_GetFileAttributesEx(RX_FN(fname).c_str(), GetFileExInfoStandard, + &attrs)) { + *file_mtime = FileTimeToUnixTime(attrs.ftLastWriteTime); + } else { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Can not get file modification time for: " + fname, lastError); + *file_mtime = 0; + } + + return s; +} + +Status WinEnvIO::RenameFile(const std::string& src, + const std::string& target) { + Status result; + + // rename() is not capable of replacing the existing file as on Linux + // so use OS API directly + if (!RX_MoveFileEx(RX_FN(src).c_str(), RX_FN(target).c_str(), + MOVEFILE_REPLACE_EXISTING)) { + DWORD lastError = GetLastError(); + + std::string text("Failed to rename: "); + text.append(src).append(" to: ").append(target); + + result = IOErrorFromWindowsError(text, lastError); + } + + return result; +} + +Status WinEnvIO::LinkFile(const std::string& src, + const std::string& target) { + Status result; + + if (!RX_CreateHardLink(RX_FN(target).c_str(), RX_FN(src).c_str(), NULL)) { + DWORD lastError = GetLastError(); + if (lastError == ERROR_NOT_SAME_DEVICE) { + return Status::NotSupported("No cross FS links allowed"); + } + + std::string text("Failed to link: "); + text.append(src).append(" to: ").append(target); + + result = IOErrorFromWindowsError(text, lastError); + } + + return result; +} + +Status WinEnvIO::NumFileLinks(const std::string& fname, uint64_t* count) { + Status s; + HANDLE handle = RX_CreateFile( + RX_FN(fname).c_str(), 0, + FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, NULL); + + if (INVALID_HANDLE_VALUE == handle) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("NumFileLinks: " + fname, lastError); + return s; + } + UniqueCloseHandlePtr handle_guard(handle, CloseHandleFunc); + FILE_STANDARD_INFO standard_info; + if (0 != GetFileInformationByHandleEx(handle, FileStandardInfo, + &standard_info, + sizeof(standard_info))) { + *count = standard_info.NumberOfLinks; + } else { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("GetFileInformationByHandleEx: " + fname, + lastError); + } + return s; +} + +Status WinEnvIO::AreFilesSame(const std::string& first, + const std::string& second, bool* res) { +// For MinGW builds +#if (_WIN32_WINNT == _WIN32_WINNT_VISTA) + Status s = Status::NotSupported(); +#else + assert(res != nullptr); + Status s; + if (res == nullptr) { + s = Status::InvalidArgument("res"); + return s; + } + + // 0 - for access means read metadata + HANDLE file_1 = RX_CreateFile( + RX_FN(first).c_str(), 0, + FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible + NULL); + + if (INVALID_HANDLE_VALUE == file_1) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("open file: " + first, lastError); + return s; + } + UniqueCloseHandlePtr g_1(file_1, CloseHandleFunc); + + HANDLE file_2 = RX_CreateFile( + RX_FN(second).c_str(), 0, + FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible + NULL); + + if (INVALID_HANDLE_VALUE == file_2) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("open file: " + second, lastError); + return s; + } + UniqueCloseHandlePtr g_2(file_2, CloseHandleFunc); + + FILE_ID_INFO FileInfo_1; + BOOL result = GetFileInformationByHandleEx(file_1, FileIdInfo, &FileInfo_1, + sizeof(FileInfo_1)); + + if (!result) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("stat file: " + first, lastError); + return s; + } + + FILE_ID_INFO FileInfo_2; + result = GetFileInformationByHandleEx(file_2, FileIdInfo, &FileInfo_2, + sizeof(FileInfo_2)); + + if (!result) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("stat file: " + second, lastError); + return s; + } + + if (FileInfo_1.VolumeSerialNumber == FileInfo_2.VolumeSerialNumber) { + *res = (0 == memcmp(FileInfo_1.FileId.Identifier, + FileInfo_2.FileId.Identifier, + sizeof(FileInfo_1.FileId.Identifier))); + } else { + *res = false; + } +#endif + return s; +} + +Status WinEnvIO::LockFile(const std::string& lockFname, + FileLock** lock) { + assert(lock != nullptr); + + *lock = NULL; + Status result; + + // No-sharing, this is a LOCK file + const DWORD ExclusiveAccessON = 0; + + // Obtain exclusive access to the LOCK file + // Previously, instead of NORMAL attr we set DELETE on close and that worked + // well except with fault_injection test that insists on deleting it. + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile(RX_FN(lockFname).c_str(), + (GENERIC_READ | GENERIC_WRITE), + ExclusiveAccessON, NULL, CREATE_ALWAYS, + FILE_ATTRIBUTE_NORMAL, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + result = IOErrorFromWindowsError( + "Failed to create lock file: " + lockFname, lastError); + } else { + *lock = new WinFileLock(hFile); + } + + return result; +} + +Status WinEnvIO::UnlockFile(FileLock* lock) { + Status result; + + assert(lock != nullptr); + + delete lock; + + return result; +} + +Status WinEnvIO::GetTestDirectory(std::string* result) { + + std::string output; + + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + output = env; + } else { + env = getenv("TMP"); + + if (env && env[0] != '\0') { + output = env; + } else { + output = "c:\\tmp"; + } + } + CreateDir(output); + + output.append("\\testrocksdb-"); + output.append(std::to_string(_getpid())); + + CreateDir(output); + + output.swap(*result); + + return Status::OK(); +} + +Status WinEnvIO::NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) { + Status s; + + result->reset(); + + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = RX_CreateFile( + RX_FN(fname).c_str(), GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are + // renamed and deleted before + // they are closed. This enables + // doing so. + NULL, + CREATE_ALWAYS, // Original fopen mode is "w" + FILE_ATTRIBUTE_NORMAL, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to open LogFile" + fname, lastError); + } else { + { + // With log files we want to set the true creation time as of now + // because the system + // for some reason caches the attributes of the previous file that just + // been renamed from + // this name so auto_roll_logger_test fails + FILETIME ft; + GetSystemTimeAsFileTime(&ft); + // Set creation, last access and last write time to the same value + SetFileTime(hFile, &ft, &ft, &ft); + } + result->reset(new WinLogger(&WinEnvThreads::gettid, hosted_env_, hFile)); + } + return s; +} + +uint64_t WinEnvIO::NowMicros() { + + if (GetSystemTimePreciseAsFileTime_ != NULL) { + // all std::chrono clocks on windows proved to return + // values that may repeat that is not good enough for some uses. + const int64_t c_UnixEpochStartTicks = 116444736000000000LL; + const int64_t c_FtToMicroSec = 10; + + // This interface needs to return system time and not + // just any microseconds because it is often used as an argument + // to TimedWait() on condition variable + FILETIME ftSystemTime; + GetSystemTimePreciseAsFileTime_(&ftSystemTime); + + LARGE_INTEGER li; + li.LowPart = ftSystemTime.dwLowDateTime; + li.HighPart = ftSystemTime.dwHighDateTime; + // Subtract unix epoch start + li.QuadPart -= c_UnixEpochStartTicks; + // Convert to microsecs + li.QuadPart /= c_FtToMicroSec; + return li.QuadPart; + } + using namespace std::chrono; + return duration_cast<microseconds>( + high_resolution_clock::now().time_since_epoch()).count(); +} + +uint64_t WinEnvIO::NowNanos() { + if (nano_seconds_per_period_ != 0) { + // all std::chrono clocks on windows have the same resolution that is only + // good enough for microseconds but not nanoseconds + // On Windows 8 and Windows 2012 Server + // GetSystemTimePreciseAsFileTime(¤t_time) can be used + LARGE_INTEGER li; + QueryPerformanceCounter(&li); + // Convert performance counter to nanoseconds by precomputed ratio. + // Directly multiply nano::den with li.QuadPart causes overflow. + // Only do this when nano::den is divisible by perf_counter_frequency_, + // which most likely is the case in reality. If it's not, fall back to + // high_resolution_clock, which may be less precise under old compilers. + li.QuadPart *= nano_seconds_per_period_; + return li.QuadPart; + } + using namespace std::chrono; + return duration_cast<nanoseconds>( + high_resolution_clock::now().time_since_epoch()).count(); +} + +Status WinEnvIO::GetHostName(char* name, uint64_t len) { + Status s; + DWORD nSize = static_cast<DWORD>( + std::min<uint64_t>(len, std::numeric_limits<DWORD>::max())); + + if (!::GetComputerNameA(name, &nSize)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("GetHostName", lastError); + } else { + name[nSize] = 0; + } + + return s; +} + +Status WinEnvIO::GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + // Check if we already have an absolute path + // For test compatibility we will consider starting slash as an + // absolute path + if ((!db_path.empty() && (db_path[0] == '\\' || db_path[0] == '/')) || + !RX_PathIsRelative(RX_FN(db_path).c_str())) { + *output_path = db_path; + return Status::OK(); + } + + RX_FILESTRING result; + result.resize(MAX_PATH); + + // Hopefully no changes the current directory while we do this + // however _getcwd also suffers from the same limitation + DWORD len = RX_GetCurrentDirectory(MAX_PATH, &result[0]); + if (len == 0) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError("Failed to get current working directory", + lastError); + } + + result.resize(len); + std::string res = FN_TO_RX(result); + + res.swap(*output_path); + return Status::OK(); +} + +std::string WinEnvIO::TimeToString(uint64_t secondsSince1970) { + std::string result; + + const time_t seconds = secondsSince1970; + const int maxsize = 64; + + struct tm t; + errno_t ret = localtime_s(&t, &seconds); + + if (ret) { + result = std::to_string(seconds); + } else { + result.resize(maxsize); + char* p = &result[0]; + + int len = snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec); + assert(len > 0); + + result.resize(len); + } + + return result; +} + +EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const { + EnvOptions optimized(env_options); + // These two the same as default optimizations + optimized.bytes_per_sync = db_options.wal_bytes_per_sync; + optimized.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; + + // This adversely affects %999 on windows + optimized.use_mmap_writes = false; + // Direct writes will produce a huge perf impact on + // Windows. Pre-allocate space for WAL. + optimized.use_direct_writes = false; + return optimized; +} + +EnvOptions WinEnvIO::OptimizeForManifestWrite( + const EnvOptions& env_options) const { + EnvOptions optimized(env_options); + optimized.use_mmap_writes = false; + optimized.use_direct_reads = false; + return optimized; +} + +EnvOptions WinEnvIO::OptimizeForManifestRead( + const EnvOptions& env_options) const { + EnvOptions optimized(env_options); + optimized.use_mmap_writes = false; + optimized.use_direct_reads = false; + return optimized; +} + +// Returns true iff the named directory exists and is a directory. +bool WinEnvIO::DirExists(const std::string& dname) { + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (RX_GetFileAttributesEx(RX_FN(dname).c_str(), + GetFileExInfoStandard, &attrs)) { + return 0 != (attrs.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY); + } + return false; +} + +size_t WinEnvIO::GetSectorSize(const std::string& fname) { + size_t sector_size = kSectorSize; + + if (RX_PathIsRelative(RX_FN(fname).c_str())) { + return sector_size; + } + + // obtain device handle + char devicename[7] = "\\\\.\\"; + int erresult = strncat_s(devicename, sizeof(devicename), fname.c_str(), 2); + + if (erresult) { + assert(false); + return sector_size; + } + + HANDLE hDevice = CreateFile(devicename, 0, 0, nullptr, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, nullptr); + + if (hDevice == INVALID_HANDLE_VALUE) { + return sector_size; + } + + STORAGE_PROPERTY_QUERY spropertyquery; + spropertyquery.PropertyId = StorageAccessAlignmentProperty; + spropertyquery.QueryType = PropertyStandardQuery; + + BYTE output_buffer[sizeof(STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR)]; + DWORD output_bytes = 0; + + BOOL ret = DeviceIoControl(hDevice, IOCTL_STORAGE_QUERY_PROPERTY, + &spropertyquery, sizeof(spropertyquery), + output_buffer, + sizeof(STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR), + &output_bytes, nullptr); + + if (ret) { + sector_size = ((STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR *)output_buffer)->BytesPerLogicalSector; + } else { + // many devices do not support StorageProcessAlignmentProperty. Any failure here and we + // fall back to logical alignment + + DISK_GEOMETRY_EX geometry = { 0 }; + ret = DeviceIoControl(hDevice, IOCTL_DISK_GET_DRIVE_GEOMETRY, + nullptr, 0, &geometry, sizeof(geometry), &output_bytes, nullptr); + if (ret) { + sector_size = geometry.Geometry.BytesPerSector; + } + } + + if (hDevice != INVALID_HANDLE_VALUE) { + CloseHandle(hDevice); + } + + return sector_size; +} + +//////////////////////////////////////////////////////////////////////// +// WinEnvThreads + +WinEnvThreads::WinEnvThreads(Env* hosted_env) + : hosted_env_(hosted_env), thread_pools_(Env::Priority::TOTAL) { + + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].SetThreadPriority( + static_cast<Env::Priority>(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(hosted_env); + } +} + +WinEnvThreads::~WinEnvThreads() { + + WaitForJoin(); + + for (auto& thpool : thread_pools_) { + thpool.JoinAllThreads(); + } +} + +void WinEnvThreads::Schedule(void(*function)(void*), void* arg, + Env::Priority pri, void* tag, + void(*unschedFunction)(void* arg)) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); +} + +int WinEnvThreads::UnSchedule(void* arg, Env::Priority pri) { + return thread_pools_[pri].UnSchedule(arg); +} + +namespace { + + struct StartThreadState { + void(*user_function)(void*); + void* arg; + }; + + void* StartThreadWrapper(void* arg) { + std::unique_ptr<StartThreadState> state( + reinterpret_cast<StartThreadState*>(arg)); + state->user_function(state->arg); + return nullptr; + } + +} + +void WinEnvThreads::StartThread(void(*function)(void* arg), void* arg) { + std::unique_ptr<StartThreadState> state(new StartThreadState); + state->user_function = function; + state->arg = arg; + try { + + rocksdb::port::WindowsThread th(&StartThreadWrapper, state.get()); + state.release(); + + std::lock_guard<std::mutex> lg(mu_); + threads_to_join_.push_back(std::move(th)); + + } catch (const std::system_error& ex) { + WinthreadCall("start thread", ex.code()); + } +} + +void WinEnvThreads::WaitForJoin() { + for (auto& th : threads_to_join_) { + th.join(); + } + threads_to_join_.clear(); +} + +unsigned int WinEnvThreads::GetThreadPoolQueueLen(Env::Priority pri) const { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + +uint64_t WinEnvThreads::gettid() { + uint64_t thread_id = GetCurrentThreadId(); + return thread_id; +} + +uint64_t WinEnvThreads::GetThreadID() const { return gettid(); } + +void WinEnvThreads::SleepForMicroseconds(int micros) { + std::this_thread::sleep_for(std::chrono::microseconds(micros)); +} + +void WinEnvThreads::SetBackgroundThreads(int num, Env::Priority pri) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); +} + +int WinEnvThreads::GetBackgroundThreads(Env::Priority pri) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); +} + +void WinEnvThreads::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); +} + +///////////////////////////////////////////////////////////////////////// +// WinEnv + +WinEnv::WinEnv() : winenv_io_(this), winenv_threads_(this) { + // Protected member of the base class + thread_status_updater_ = CreateThreadStatusUpdater(); +} + + +WinEnv::~WinEnv() { + // All threads must be joined before the deletion of + // thread_status_updater_. + delete thread_status_updater_; +} + +Status WinEnv::GetThreadList(std::vector<ThreadStatus>* thread_list) { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); +} + +Status WinEnv::DeleteFile(const std::string& fname) { + return winenv_io_.DeleteFile(fname); +} + +Status WinEnv::Truncate(const std::string& fname, size_t size) { + return winenv_io_.Truncate(fname, size); +} + +Status WinEnv::GetCurrentTime(int64_t* unix_time) { + return winenv_io_.GetCurrentTime(unix_time); +} + +Status WinEnv::NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) { + return winenv_io_.NewSequentialFile(fname, result, options); +} + +Status WinEnv::NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options) { + return winenv_io_.NewRandomAccessFile(fname, result, options); +} + +Status WinEnv::NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) { + return winenv_io_.OpenWritableFile(fname, result, options, false); +} + +Status WinEnv::ReopenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) { + return winenv_io_.OpenWritableFile(fname, result, options, true); +} + +Status WinEnv::NewRandomRWFile(const std::string & fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions & options) { + return winenv_io_.NewRandomRWFile(fname, result, options); +} + +Status WinEnv::NewMemoryMappedFileBuffer( + const std::string& fname, + std::unique_ptr<MemoryMappedFileBuffer>* result) { + return winenv_io_.NewMemoryMappedFileBuffer(fname, result); +} + +Status WinEnv::NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) { + return winenv_io_.NewDirectory(name, result); +} + +Status WinEnv::FileExists(const std::string& fname) { + return winenv_io_.FileExists(fname); +} + +Status WinEnv::GetChildren(const std::string& dir, + std::vector<std::string>* result) { + return winenv_io_.GetChildren(dir, result); +} + +Status WinEnv::CreateDir(const std::string& name) { + return winenv_io_.CreateDir(name); +} + +Status WinEnv::CreateDirIfMissing(const std::string& name) { + return winenv_io_.CreateDirIfMissing(name); +} + +Status WinEnv::DeleteDir(const std::string& name) { + return winenv_io_.DeleteDir(name); +} + +Status WinEnv::GetFileSize(const std::string& fname, + uint64_t* size) { + return winenv_io_.GetFileSize(fname, size); +} + +Status WinEnv::GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + return winenv_io_.GetFileModificationTime(fname, file_mtime); +} + +Status WinEnv::RenameFile(const std::string& src, + const std::string& target) { + return winenv_io_.RenameFile(src, target); +} + +Status WinEnv::LinkFile(const std::string& src, + const std::string& target) { + return winenv_io_.LinkFile(src, target); +} + +Status WinEnv::NumFileLinks(const std::string& fname, uint64_t* count) { + return winenv_io_.NumFileLinks(fname, count); +} + +Status WinEnv::AreFilesSame(const std::string& first, + const std::string& second, bool* res) { + return winenv_io_.AreFilesSame(first, second, res); +} + +Status WinEnv::LockFile(const std::string& lockFname, + FileLock** lock) { + return winenv_io_.LockFile(lockFname, lock); +} + +Status WinEnv::UnlockFile(FileLock* lock) { + return winenv_io_.UnlockFile(lock); +} + +Status WinEnv::GetTestDirectory(std::string* result) { + return winenv_io_.GetTestDirectory(result); +} + +Status WinEnv::NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) { + return winenv_io_.NewLogger(fname, result); +} + +uint64_t WinEnv::NowMicros() { + return winenv_io_.NowMicros(); +} + +uint64_t WinEnv::NowNanos() { + return winenv_io_.NowNanos(); +} + +Status WinEnv::GetHostName(char* name, uint64_t len) { + return winenv_io_.GetHostName(name, len); +} + +Status WinEnv::GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + return winenv_io_.GetAbsolutePath(db_path, output_path); +} + +std::string WinEnv::TimeToString(uint64_t secondsSince1970) { + return winenv_io_.TimeToString(secondsSince1970); +} + +void WinEnv::Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, + void(*unschedFunction)(void* arg)) { + return winenv_threads_.Schedule(function, arg, pri, tag, unschedFunction); +} + +int WinEnv::UnSchedule(void* arg, Env::Priority pri) { + return winenv_threads_.UnSchedule(arg, pri); +} + +void WinEnv::StartThread(void(*function)(void* arg), void* arg) { + return winenv_threads_.StartThread(function, arg); +} + +void WinEnv::WaitForJoin() { + return winenv_threads_.WaitForJoin(); +} + +unsigned int WinEnv::GetThreadPoolQueueLen(Env::Priority pri) const { + return winenv_threads_.GetThreadPoolQueueLen(pri); +} + +uint64_t WinEnv::GetThreadID() const { + return winenv_threads_.GetThreadID(); +} + +void WinEnv::SleepForMicroseconds(int micros) { + return winenv_threads_.SleepForMicroseconds(micros); +} + +// Allow increasing the number of worker threads. +void WinEnv::SetBackgroundThreads(int num, Env::Priority pri) { + return winenv_threads_.SetBackgroundThreads(num, pri); +} + +int WinEnv::GetBackgroundThreads(Env::Priority pri) { + return winenv_threads_.GetBackgroundThreads(pri); +} + +void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { + return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri); +} + +EnvOptions WinEnv::OptimizeForManifestRead( + const EnvOptions& env_options) const { + return winenv_io_.OptimizeForManifestRead(env_options); +} + +EnvOptions WinEnv::OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const { + return winenv_io_.OptimizeForLogWrite(env_options, db_options); +} + +EnvOptions WinEnv::OptimizeForManifestWrite( + const EnvOptions& env_options) const { + return winenv_io_.OptimizeForManifestWrite(env_options); +} + +} // namespace port + +std::string Env::GenerateUniqueId() { + std::string result; + + UUID uuid; + UuidCreateSequential(&uuid); + + RPC_CSTR rpc_str; + auto status = UuidToStringA(&uuid, &rpc_str); + (void)status; + assert(status == RPC_S_OK); + + result = reinterpret_cast<char*>(rpc_str); + + status = RpcStringFreeA(&rpc_str); + assert(status == RPC_S_OK); + + return result; +} + +} // namespace rocksdb diff --git a/src/rocksdb/port/win/env_win.h b/src/rocksdb/port/win/env_win.h new file mode 100644 index 00000000..7a4d48de --- /dev/null +++ b/src/rocksdb/port/win/env_win.h @@ -0,0 +1,335 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An Env is an interface used by the rocksdb implementation to access +// operating system functionality like the filesystem etc. Callers +// may wish to provide a custom Env object when opening a database to +// get fine gain control; e.g., to rate limit file system operations. +// +// All Env implementations are safe for concurrent access from +// multiple threads without any external synchronization. + +#pragma once + +#include "port/win/win_thread.h" +#include <rocksdb/env.h> +#include "util/threadpool_imp.h" + +#include <stdint.h> +#include <windows.h> + +#include <mutex> +#include <vector> +#include <string> + + +#undef GetCurrentTime +#undef DeleteFile +#undef GetTickCount + +namespace rocksdb { +namespace port { + +// Currently not designed for inheritance but rather a replacement +class WinEnvThreads { +public: + + explicit WinEnvThreads(Env* hosted_env); + + ~WinEnvThreads(); + + WinEnvThreads(const WinEnvThreads&) = delete; + WinEnvThreads& operator=(const WinEnvThreads&) = delete; + + void Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, void(*unschedFunction)(void* arg)); + + int UnSchedule(void* arg, Env::Priority pri); + + void StartThread(void(*function)(void* arg), void* arg); + + void WaitForJoin(); + + unsigned int GetThreadPoolQueueLen(Env::Priority pri) const; + + static uint64_t gettid(); + + uint64_t GetThreadID() const; + + void SleepForMicroseconds(int micros); + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Env::Priority pri); + int GetBackgroundThreads(Env::Priority pri); + + void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri); + +private: + + Env* hosted_env_; + mutable std::mutex mu_; + std::vector<ThreadPoolImpl> thread_pools_; + std::vector<WindowsThread> threads_to_join_; + +}; + +// Designed for inheritance so can be re-used +// but certain parts replaced +class WinEnvIO { +public: + explicit WinEnvIO(Env* hosted_env); + + virtual ~WinEnvIO(); + + virtual Status DeleteFile(const std::string& fname); + + Status Truncate(const std::string& fname, size_t size); + + virtual Status GetCurrentTime(int64_t* unix_time); + + virtual Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options); + + // Helper for NewWritable and ReopenWritableFile + virtual Status OpenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options, + bool reopen); + + virtual Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options); + + // The returned file will only be accessed by one thread at a time. + virtual Status NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& options); + + virtual Status NewMemoryMappedFileBuffer( + const std::string& fname, + std::unique_ptr<MemoryMappedFileBuffer>* result); + + virtual Status NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result); + + virtual Status FileExists(const std::string& fname); + + virtual Status GetChildren(const std::string& dir, + std::vector<std::string>* result); + + virtual Status CreateDir(const std::string& name); + + virtual Status CreateDirIfMissing(const std::string& name); + + virtual Status DeleteDir(const std::string& name); + + virtual Status GetFileSize(const std::string& fname, uint64_t* size); + + static uint64_t FileTimeToUnixTime(const FILETIME& ftTime); + + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime); + + virtual Status RenameFile(const std::string& src, const std::string& target); + + virtual Status LinkFile(const std::string& src, const std::string& target); + + virtual Status NumFileLinks(const std::string& /*fname*/, + uint64_t* /*count*/); + + virtual Status AreFilesSame(const std::string& first, + const std::string& second, bool* res); + + virtual Status LockFile(const std::string& lockFname, FileLock** lock); + + virtual Status UnlockFile(FileLock* lock); + + virtual Status GetTestDirectory(std::string* result); + + virtual Status NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result); + + virtual uint64_t NowMicros(); + + virtual uint64_t NowNanos(); + + virtual Status GetHostName(char* name, uint64_t len); + + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* output_path); + + virtual std::string TimeToString(uint64_t secondsSince1970); + + virtual EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const; + + virtual EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const; + + virtual EnvOptions OptimizeForManifestRead( + const EnvOptions& env_options) const; + + size_t GetPageSize() const { return page_size_; } + + size_t GetAllocationGranularity() const { return allocation_granularity_; } + + uint64_t GetPerfCounterFrequency() const { return perf_counter_frequency_; } + + static size_t GetSectorSize(const std::string& fname); + +private: + // Returns true iff the named directory exists and is a directory. + virtual bool DirExists(const std::string& dname); + + typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); + + Env* hosted_env_; + size_t page_size_; + size_t allocation_granularity_; + uint64_t perf_counter_frequency_; + uint64_t nano_seconds_per_period_; + FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; +}; + +class WinEnv : public Env { +public: + WinEnv(); + + ~WinEnv(); + + Status DeleteFile(const std::string& fname) override; + + Status Truncate(const std::string& fname, size_t size) override; + + Status GetCurrentTime(int64_t* unix_time) override; + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) override; + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options) override; + + Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override; + + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override; + + // The returned file will only be accessed by one thread at a time. + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& options) override; + + Status NewMemoryMappedFileBuffer( + const std::string& fname, + std::unique_ptr<MemoryMappedFileBuffer>* result) override; + + Status NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) override; + + Status FileExists(const std::string& fname) override; + + Status GetChildren(const std::string& dir, + std::vector<std::string>* result) override; + + Status CreateDir(const std::string& name) override; + + Status CreateDirIfMissing(const std::string& name) override; + + Status DeleteDir(const std::string& name) override; + + Status GetFileSize(const std::string& fname, + uint64_t* size) override; + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override; + + Status RenameFile(const std::string& src, + const std::string& target) override; + + Status LinkFile(const std::string& src, + const std::string& target) override; + + Status NumFileLinks(const std::string& fname, uint64_t* count) override; + + Status AreFilesSame(const std::string& first, + const std::string& second, bool* res) override; + + Status LockFile(const std::string& lockFname, FileLock** lock) override; + + Status UnlockFile(FileLock* lock) override; + + Status GetTestDirectory(std::string* result) override; + + Status NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) override; + + uint64_t NowMicros() override; + + uint64_t NowNanos() override; + + Status GetHostName(char* name, uint64_t len) override; + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override; + + std::string TimeToString(uint64_t secondsSince1970) override; + + Status GetThreadList(std::vector<ThreadStatus>* thread_list) override; + + void Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, void(*unschedFunction)(void* arg)) override; + + int UnSchedule(void* arg, Env::Priority pri) override; + + void StartThread(void(*function)(void* arg), void* arg) override; + + void WaitForJoin(); + + unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override; + + uint64_t GetThreadID() const override; + + void SleepForMicroseconds(int micros) override; + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Env::Priority pri) override; + int GetBackgroundThreads(Env::Priority pri) override; + + void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override; + + EnvOptions OptimizeForManifestRead( + const EnvOptions& env_options) const override; + + EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const override; + + EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const override; + + +private: + + WinEnvIO winenv_io_; + WinEnvThreads winenv_threads_; +}; + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/win/io_win.cc b/src/rocksdb/port/win/io_win.cc new file mode 100644 index 00000000..128cb60b --- /dev/null +++ b/src/rocksdb/port/win/io_win.cc @@ -0,0 +1,1103 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/io_win.h" + +#include "monitoring/iostats_context_imp.h" +#include "util/aligned_buffer.h" +#include "util/coding.h" +#include "util/sync_point.h" + +namespace rocksdb { +namespace port { + +/* +* DirectIOHelper +*/ +namespace { + +const size_t kSectorSize = 512; + +inline +bool IsPowerOfTwo(const size_t alignment) { + return ((alignment) & (alignment - 1)) == 0; +} + +inline +bool IsSectorAligned(const size_t off) { + return (off & (kSectorSize - 1)) == 0; +} + +inline +bool IsAligned(size_t alignment, const void* ptr) { + return ((uintptr_t(ptr)) & (alignment - 1)) == 0; +} +} + + +std::string GetWindowsErrSz(DWORD err) { + LPSTR lpMsgBuf; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, err, + 0, // Default language + reinterpret_cast<LPSTR>(&lpMsgBuf), 0, NULL); + + std::string Err = lpMsgBuf; + LocalFree(lpMsgBuf); + return Err; +} + +// We preserve the original name of this interface to denote the original idea +// behind it. +// All reads happen by a specified offset and pwrite interface does not change +// the position of the file pointer. Judging from the man page and errno it does +// execute +// lseek atomically to return the position of the file back where it was. +// WriteFile() does not +// have this capability. Therefore, for both pread and pwrite the pointer is +// advanced to the next position +// which is fine for writes because they are (should be) sequential. +// Because all the reads/writes happen by the specified offset, the caller in +// theory should not +// rely on the current file offset. +Status pwrite(const WinFileData* file_data, const Slice& data, + uint64_t offset, size_t& bytes_written) { + + Status s; + bytes_written = 0; + + size_t num_bytes = data.size(); + if (num_bytes > std::numeric_limits<DWORD>::max()) { + // May happen in 64-bit builds where size_t is 64-bits but + // long is still 32-bit, but that's the API here at the moment + return Status::InvalidArgument("num_bytes is too large for a single write: " + + file_data->GetName()); + } + + OVERLAPPED overlapped = { 0 }; + ULARGE_INTEGER offsetUnion; + offsetUnion.QuadPart = offset; + + overlapped.Offset = offsetUnion.LowPart; + overlapped.OffsetHigh = offsetUnion.HighPart; + + DWORD bytesWritten = 0; + + if (FALSE == WriteFile(file_data->GetFileHandle(), data.data(), static_cast<DWORD>(num_bytes), + &bytesWritten, &overlapped)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("WriteFile failed: " + file_data->GetName(), + lastError); + } else { + bytes_written = bytesWritten; + } + + return s; +} + +// See comments for pwrite above +Status pread(const WinFileData* file_data, char* src, size_t num_bytes, + uint64_t offset, size_t& bytes_read) { + + Status s; + bytes_read = 0; + + if (num_bytes > std::numeric_limits<DWORD>::max()) { + return Status::InvalidArgument("num_bytes is too large for a single read: " + + file_data->GetName()); + } + + OVERLAPPED overlapped = { 0 }; + ULARGE_INTEGER offsetUnion; + offsetUnion.QuadPart = offset; + + overlapped.Offset = offsetUnion.LowPart; + overlapped.OffsetHigh = offsetUnion.HighPart; + + DWORD bytesRead = 0; + + if (FALSE == ReadFile(file_data->GetFileHandle(), src, static_cast<DWORD>(num_bytes), + &bytesRead, &overlapped)) { + auto lastError = GetLastError(); + // EOF is OK with zero bytes read + if (lastError != ERROR_HANDLE_EOF) { + s = IOErrorFromWindowsError("ReadFile failed: " + file_data->GetName(), + lastError); + } + } else { + bytes_read = bytesRead; + } + + return s; +} + +// SetFileInformationByHandle() is capable of fast pre-allocates. +// However, this does not change the file end position unless the file is +// truncated and the pre-allocated space is not considered filled with zeros. +Status fallocate(const std::string& filename, HANDLE hFile, + uint64_t to_size) { + Status status; + + FILE_ALLOCATION_INFO alloc_info; + alloc_info.AllocationSize.QuadPart = to_size; + + if (!SetFileInformationByHandle(hFile, FileAllocationInfo, &alloc_info, + sizeof(FILE_ALLOCATION_INFO))) { + auto lastError = GetLastError(); + status = IOErrorFromWindowsError( + "Failed to pre-allocate space: " + filename, lastError); + } + + return status; +} + +Status ftruncate(const std::string& filename, HANDLE hFile, + uint64_t toSize) { + Status status; + + FILE_END_OF_FILE_INFO end_of_file; + end_of_file.EndOfFile.QuadPart = toSize; + + if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file, + sizeof(FILE_END_OF_FILE_INFO))) { + auto lastError = GetLastError(); + status = IOErrorFromWindowsError("Failed to Set end of file: " + filename, + lastError); + } + + return status; +} + +size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) { + + if (max_size < kMaxVarint64Length * 3) { + return 0; + } +#if (_WIN32_WINNT == _WIN32_WINNT_VISTA) + // MINGGW as defined by CMake file. + // yuslepukhin: I hate the guts of the above macros. + // This impl does not guarantee uniqueness everywhere + // is reasonably good + BY_HANDLE_FILE_INFORMATION FileInfo; + + BOOL result = GetFileInformationByHandle(hFile, &FileInfo); + + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); + + if (!result) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, uint64_t(FileInfo.dwVolumeSerialNumber)); + rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexHigh)); + rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexLow)); + + assert(rid >= id); + return static_cast<size_t>(rid - id); +#else + FILE_ID_INFO FileInfo; + BOOL result = GetFileInformationByHandleEx(hFile, FileIdInfo, &FileInfo, + sizeof(FileInfo)); + + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); + + if (!result) { + return 0; + } + + static_assert(sizeof(uint64_t) == sizeof(FileInfo.VolumeSerialNumber), + "Wrong sizeof expectations"); + // FileId.Identifier is an array of 16 BYTEs, we encode them as two uint64_t + static_assert(sizeof(uint64_t) * 2 == sizeof(FileInfo.FileId.Identifier), + "Wrong sizeof expectations"); + + char* rid = id; + rid = EncodeVarint64(rid, uint64_t(FileInfo.VolumeSerialNumber)); + uint64_t* file_id = reinterpret_cast<uint64_t*>(&FileInfo.FileId.Identifier[0]); + rid = EncodeVarint64(rid, *file_id); + ++file_id; + rid = EncodeVarint64(rid, *file_id); + + assert(rid >= id); + return static_cast<size_t>(rid - id); +#endif +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// WinMmapReadableFile + +WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, + HANDLE hFile, HANDLE hMap, + const void* mapped_region, + size_t length) + : WinFileData(fileName, hFile, false /* use_direct_io */), + hMap_(hMap), + mapped_region_(mapped_region), + length_(length) {} + +WinMmapReadableFile::~WinMmapReadableFile() { + BOOL ret __attribute__((__unused__)); + ret = ::UnmapViewOfFile(mapped_region_); + assert(ret); + + ret = ::CloseHandle(hMap_); + assert(ret); +} + +Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + + if (offset > length_) { + *result = Slice(); + return IOError(filename_, EINVAL); + } else if (offset + n > length_) { + n = length_ - static_cast<size_t>(offset); + } + *result = + Slice(reinterpret_cast<const char*>(mapped_region_)+offset, n); + return s; +} + +Status WinMmapReadableFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +size_t WinMmapReadableFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +/////////////////////////////////////////////////////////////////////////////// +/// WinMmapFile + + +// Can only truncate or reserve to a sector size aligned if +// used on files that are opened with Unbuffered I/O +Status WinMmapFile::TruncateFile(uint64_t toSize) { + return ftruncate(filename_, hFile_, toSize); +} + +Status WinMmapFile::UnmapCurrentRegion() { + Status status; + + if (mapped_begin_ != nullptr) { + if (!::UnmapViewOfFile(mapped_begin_)) { + status = IOErrorFromWindowsError( + "Failed to unmap file view: " + filename_, GetLastError()); + } + + // Move on to the next portion of the file + file_offset_ += view_size_; + + // UnmapView automatically sends data to disk but not the metadata + // which is good and provides some equivalent of fdatasync() on Linux + // therefore, we donot need separate flag for metadata + mapped_begin_ = nullptr; + mapped_end_ = nullptr; + dst_ = nullptr; + + last_sync_ = nullptr; + pending_sync_ = false; + } + + return status; +} + +Status WinMmapFile::MapNewRegion() { + + Status status; + + assert(mapped_begin_ == nullptr); + + size_t minDiskSize = static_cast<size_t>(file_offset_) + view_size_; + + if (minDiskSize > reserved_size_) { + status = Allocate(file_offset_, view_size_); + if (!status.ok()) { + return status; + } + } + + // Need to remap + if (hMap_ == NULL || reserved_size_ > mapping_size_) { + + if (hMap_ != NULL) { + // Unmap the previous one + BOOL ret __attribute__((__unused__)); + ret = ::CloseHandle(hMap_); + assert(ret); + hMap_ = NULL; + } + + ULARGE_INTEGER mappingSize; + mappingSize.QuadPart = reserved_size_; + + hMap_ = CreateFileMappingA( + hFile_, + NULL, // Security attributes + PAGE_READWRITE, // There is not a write only mode for mapping + mappingSize.HighPart, // Enable mapping the whole file but the actual + // amount mapped is determined by MapViewOfFile + mappingSize.LowPart, + NULL); // Mapping name + + if (NULL == hMap_) { + return IOErrorFromWindowsError( + "WindowsMmapFile failed to create file mapping for: " + filename_, + GetLastError()); + } + + mapping_size_ = reserved_size_; + } + + ULARGE_INTEGER offset; + offset.QuadPart = file_offset_; + + // View must begin at the granularity aligned offset + mapped_begin_ = reinterpret_cast<char*>( + MapViewOfFileEx(hMap_, FILE_MAP_WRITE, offset.HighPart, offset.LowPart, + view_size_, NULL)); + + if (!mapped_begin_) { + status = IOErrorFromWindowsError( + "WindowsMmapFile failed to map file view: " + filename_, + GetLastError()); + } else { + mapped_end_ = mapped_begin_ + view_size_; + dst_ = mapped_begin_; + last_sync_ = mapped_begin_; + pending_sync_ = false; + } + return status; +} + +Status WinMmapFile::PreallocateInternal(uint64_t spaceToReserve) { + return fallocate(filename_, hFile_, spaceToReserve); +} + +WinMmapFile::WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, + size_t allocation_granularity, const EnvOptions& options) + : WinFileData(fname, hFile, false), + hMap_(NULL), + page_size_(page_size), + allocation_granularity_(allocation_granularity), + reserved_size_(0), + mapping_size_(0), + view_size_(0), + mapped_begin_(nullptr), + mapped_end_(nullptr), + dst_(nullptr), + last_sync_(nullptr), + file_offset_(0), + pending_sync_(false) { + // Allocation granularity must be obtained from GetSystemInfo() and must be + // a power of two. + assert(allocation_granularity > 0); + assert((allocation_granularity & (allocation_granularity - 1)) == 0); + + assert(page_size > 0); + assert((page_size & (page_size - 1)) == 0); + + // Only for memory mapped writes + assert(options.use_mmap_writes); + + // View size must be both the multiple of allocation_granularity AND the + // page size and the granularity is usually a multiple of a page size. + const size_t viewSize = 32 * 1024; // 32Kb similar to the Windows File Cache in buffered mode + view_size_ = Roundup(viewSize, allocation_granularity_); +} + +WinMmapFile::~WinMmapFile() { + if (hFile_) { + this->Close(); + } +} + +Status WinMmapFile::Append(const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + + while (left > 0) { + assert(mapped_begin_ <= dst_); + size_t avail = mapped_end_ - dst_; + + if (avail == 0) { + Status s = UnmapCurrentRegion(); + if (s.ok()) { + s = MapNewRegion(); + } + + if (!s.ok()) { + return s; + } + } else { + size_t n = std::min(left, avail); + memcpy(dst_, src, n); + dst_ += n; + src += n; + left -= n; + pending_sync_ = true; + } + } + + // Now make sure that the last partial page is padded with zeros if needed + size_t bytesToPad = Roundup(size_t(dst_), page_size_) - size_t(dst_); + if (bytesToPad > 0) { + memset(dst_, 0, bytesToPad); + } + + return Status::OK(); +} + +// Means Close() will properly take care of truncate +// and it does not need any additional information +Status WinMmapFile::Truncate(uint64_t size) { + return Status::OK(); +} + +Status WinMmapFile::Close() { + Status s; + + assert(NULL != hFile_); + + // We truncate to the precise size so no + // uninitialized data at the end. SetEndOfFile + // which we use does not write zeros and it is good. + uint64_t targetSize = GetFileSize(); + + if (mapped_begin_ != nullptr) { + // Sync before unmapping to make sure everything + // is on disk and there is not a lazy writing + // so we are deterministic with the tests + Sync(); + s = UnmapCurrentRegion(); + } + + if (NULL != hMap_) { + BOOL ret = ::CloseHandle(hMap_); + if (!ret && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to Close mapping for file: " + filename_, lastError); + } + + hMap_ = NULL; + } + + if (hFile_ != NULL) { + + TruncateFile(targetSize); + + BOOL ret = ::CloseHandle(hFile_); + hFile_ = NULL; + + if (!ret && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to close file map handle: " + filename_, lastError); + } + } + + return s; +} + +Status WinMmapFile::Flush() { return Status::OK(); } + +// Flush only data +Status WinMmapFile::Sync() { + Status s; + + // Some writes occurred since last sync + if (dst_ > last_sync_) { + assert(mapped_begin_); + assert(dst_); + assert(dst_ > mapped_begin_); + assert(dst_ < mapped_end_); + + size_t page_begin = + TruncateToPageBoundary(page_size_, last_sync_ - mapped_begin_); + size_t page_end = + TruncateToPageBoundary(page_size_, dst_ - mapped_begin_ - 1); + + // Flush only the amount of that is a multiple of pages + if (!::FlushViewOfFile(mapped_begin_ + page_begin, + (page_end - page_begin) + page_size_)) { + s = IOErrorFromWindowsError("Failed to FlushViewOfFile: " + filename_, + GetLastError()); + } else { + last_sync_ = dst_; + } + } + + return s; +} + +/** +* Flush data as well as metadata to stable storage. +*/ +Status WinMmapFile::Fsync() { + Status s = Sync(); + + // Flush metadata + if (s.ok() && pending_sync_) { + if (!::FlushFileBuffers(hFile_)) { + s = IOErrorFromWindowsError("Failed to FlushFileBuffers: " + filename_, + GetLastError()); + } + pending_sync_ = false; + } + + return s; +} + +/** +* Get the size of valid data in the file. This will not match the +* size that is returned from the filesystem because we use mmap +* to extend file by map_size every time. +*/ +uint64_t WinMmapFile::GetFileSize() { + size_t used = dst_ - mapped_begin_; + return file_offset_ + used; +} + +Status WinMmapFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +Status WinMmapFile::Allocate(uint64_t offset, uint64_t len) { + Status status; + TEST_KILL_RANDOM("WinMmapFile::Allocate", rocksdb_kill_odds); + + // Make sure that we reserve an aligned amount of space + // since the reservation block size is driven outside so we want + // to check if we are ok with reservation here + size_t spaceToReserve = Roundup(static_cast<size_t>(offset + len), view_size_); + // Nothing to do + if (spaceToReserve <= reserved_size_) { + return status; + } + + IOSTATS_TIMER_GUARD(allocate_nanos); + status = PreallocateInternal(spaceToReserve); + if (status.ok()) { + reserved_size_ = spaceToReserve; + } + return status; +} + +size_t WinMmapFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +////////////////////////////////////////////////////////////////////////////////// +// WinSequentialFile + +WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f, + const EnvOptions& options) + : WinFileData(fname, f, options.use_direct_reads) {} + +WinSequentialFile::~WinSequentialFile() { + assert(hFile_ != INVALID_HANDLE_VALUE); +} + +Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) { + Status s; + size_t r = 0; + + assert(result != nullptr); + if (WinFileData::use_direct_io()) { + return Status::NotSupported("Read() does not support direct_io"); + } + + // Windows ReadFile API accepts a DWORD. + // While it is possible to read in a loop if n is too big + // it is an unlikely case. + if (n > std::numeric_limits<DWORD>::max()) { + return Status::InvalidArgument("n is too big for a single ReadFile: " + + filename_); + } + + DWORD bytesToRead = static_cast<DWORD>(n); //cast is safe due to the check above + DWORD bytesRead = 0; + BOOL ret = ReadFile(hFile_, scratch, bytesToRead, &bytesRead, NULL); + if (ret != FALSE) { + r = bytesRead; + } else { + auto lastError = GetLastError(); + if (lastError != ERROR_HANDLE_EOF) { + s = IOErrorFromWindowsError("ReadFile failed: " + filename_, + lastError); + } + } + + *result = Slice(scratch, r); + return s; +} + +Status WinSequentialFile::PositionedReadInternal(char* src, size_t numBytes, + uint64_t offset, size_t& bytes_read) const { + return pread(this, src, numBytes, offset, bytes_read); +} + +Status WinSequentialFile::PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) { + + Status s; + + if (!WinFileData::use_direct_io()) { + return Status::NotSupported("This function is only used for direct_io"); + } + + if (!IsSectorAligned(static_cast<size_t>(offset)) || + !IsSectorAligned(n)) { + return Status::InvalidArgument( + "WinSequentialFile::PositionedRead: offset is not properly aligned"); + } + + size_t bytes_read = 0; // out param + s = PositionedReadInternal(scratch, static_cast<size_t>(n), offset, bytes_read); + *result = Slice(scratch, bytes_read); + return s; +} + + +Status WinSequentialFile::Skip(uint64_t n) { + // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit + // integer. As such it is a highly unlikley case to have n so large. + if (n > static_cast<uint64_t>(std::numeric_limits<LONGLONG>::max())) { + return Status::InvalidArgument("n is too large for a single SetFilePointerEx() call" + + filename_); + } + + LARGE_INTEGER li; + li.QuadPart = static_cast<LONGLONG>(n); //cast is safe due to the check above + BOOL ret = SetFilePointerEx(hFile_, li, NULL, FILE_CURRENT); + if (ret == FALSE) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError("Skip SetFilePointerEx():" + filename_, + lastError); + } + return Status::OK(); +} + +Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +////////////////////////////////////////////////////////////////////////////////////////////////// +/// WinRandomAccessBase + +inline +Status WinRandomAccessImpl::PositionedReadInternal(char* src, + size_t numBytes, + uint64_t offset, + size_t& bytes_read) const { + return pread(file_base_, src, numBytes, offset, bytes_read); +} + +inline +WinRandomAccessImpl::WinRandomAccessImpl(WinFileData* file_base, + size_t alignment, + const EnvOptions& options) : + file_base_(file_base), + alignment_(alignment) { + + assert(!options.use_mmap_reads); +} + +inline +Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + + Status s; + + // Check buffer alignment + if (file_base_->use_direct_io()) { + if (!IsSectorAligned(static_cast<size_t>(offset)) || + !IsAligned(alignment_, scratch)) { + return Status::InvalidArgument( + "WinRandomAccessImpl::ReadImpl: offset or scratch is not properly aligned"); + } + } + + if (n == 0) { + *result = Slice(scratch, 0); + return s; + } + + size_t bytes_read = 0; + s = PositionedReadInternal(scratch, n, offset, bytes_read); + *result = Slice(scratch, bytes_read); + return s; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +/// WinRandomAccessFile + +WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, + size_t alignment, + const EnvOptions& options) + : WinFileData(fname, hFile, options.use_direct_reads), + WinRandomAccessImpl(this, alignment, options) {} + +WinRandomAccessFile::~WinRandomAccessFile() { +} + +Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + return ReadImpl(offset, n, result, scratch); +} + +Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +size_t WinRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(GetFileHandle(), id, max_size); +} + +size_t WinRandomAccessFile::GetRequiredBufferAlignment() const { + return GetAlignment(); +} + +///////////////////////////////////////////////////////////////////////////// +// WinWritableImpl +// + +inline +Status WinWritableImpl::PreallocateInternal(uint64_t spaceToReserve) { + return fallocate(file_data_->GetName(), file_data_->GetFileHandle(), spaceToReserve); +} + +inline +WinWritableImpl::WinWritableImpl(WinFileData* file_data, size_t alignment) + : file_data_(file_data), + alignment_(alignment), + next_write_offset_(0), + reservedsize_(0) { + + // Query current position in case ReopenWritableFile is called + // This position is only important for buffered writes + // for unbuffered writes we explicitely specify the position. + LARGE_INTEGER zero_move; + zero_move.QuadPart = 0; // Do not move + LARGE_INTEGER pos; + pos.QuadPart = 0; + BOOL ret = SetFilePointerEx(file_data_->GetFileHandle(), zero_move, &pos, + FILE_CURRENT); + // Querying no supped to fail + if (ret != 0) { + next_write_offset_ = pos.QuadPart; + } else { + assert(false); + } +} + +inline +Status WinWritableImpl::AppendImpl(const Slice& data) { + + Status s; + + if (data.size() > std::numeric_limits<DWORD>::max()) { + return Status::InvalidArgument("data is too long for a single write" + + file_data_->GetName()); + } + + size_t bytes_written = 0; // out param + + if (file_data_->use_direct_io()) { + // With no offset specified we are appending + // to the end of the file + assert(IsSectorAligned(next_write_offset_)); + if (!IsSectorAligned(data.size()) || + !IsAligned(static_cast<size_t>(GetAlignement()), data.data())) { + s = Status::InvalidArgument( + "WriteData must be page aligned, size must be sector aligned"); + } else { + s = pwrite(file_data_, data, next_write_offset_, bytes_written); + } + } else { + + DWORD bytesWritten = 0; + if (!WriteFile(file_data_->GetFileHandle(), data.data(), + static_cast<DWORD>(data.size()), &bytesWritten, NULL)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to WriteFile: " + file_data_->GetName(), + lastError); + } else { + bytes_written = bytesWritten; + } + } + + if(s.ok()) { + if (bytes_written == data.size()) { + // This matters for direct_io cases where + // we rely on the fact that next_write_offset_ + // is sector aligned + next_write_offset_ += bytes_written; + } else { + s = Status::IOError("Failed to write all bytes: " + + file_data_->GetName()); + } + } + + return s; +} + +inline +Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) { + + if(file_data_->use_direct_io()) { + if (!IsSectorAligned(static_cast<size_t>(offset)) || + !IsSectorAligned(data.size()) || + !IsAligned(static_cast<size_t>(GetAlignement()), data.data())) { + return Status::InvalidArgument( + "Data and offset must be page aligned, size must be sector aligned"); + } + } + + size_t bytes_written = 0; + Status s = pwrite(file_data_, data, offset, bytes_written); + + if(s.ok()) { + if (bytes_written == data.size()) { + // For sequential write this would be simple + // size extension by data.size() + uint64_t write_end = offset + bytes_written; + if (write_end >= next_write_offset_) { + next_write_offset_ = write_end; + } + } else { + s = Status::IOError("Failed to write all of the requested data: " + + file_data_->GetName()); + } + } + return s; +} + +inline +Status WinWritableImpl::TruncateImpl(uint64_t size) { + + // It is tempting to check for the size for sector alignment + // but truncation may come at the end and there is not a requirement + // for this to be sector aligned so long as we do not attempt to write + // after that. The interface docs state that the behavior is undefined + // in that case. + Status s = ftruncate(file_data_->GetName(), file_data_->GetFileHandle(), + size); + + if (s.ok()) { + next_write_offset_ = size; + } + return s; +} + +inline +Status WinWritableImpl::CloseImpl() { + + Status s; + + auto hFile = file_data_->GetFileHandle(); + assert(INVALID_HANDLE_VALUE != hFile); + + if (!::FlushFileBuffers(hFile)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("FlushFileBuffers failed at Close() for: " + + file_data_->GetName(), + lastError); + } + + if(!file_data_->CloseFile() && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("CloseHandle failed for: " + file_data_->GetName(), + lastError); + } + return s; +} + +inline +Status WinWritableImpl::SyncImpl() { + Status s; + if (!::FlushFileBuffers (file_data_->GetFileHandle())) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "FlushFileBuffers failed at Sync() for: " + file_data_->GetName(), lastError); + } + return s; +} + + +inline +Status WinWritableImpl::AllocateImpl(uint64_t offset, uint64_t len) { + Status status; + TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); + + // Make sure that we reserve an aligned amount of space + // since the reservation block size is driven outside so we want + // to check if we are ok with reservation here + size_t spaceToReserve = Roundup(static_cast<size_t>(offset + len), static_cast<size_t>(alignment_)); + // Nothing to do + if (spaceToReserve <= reservedsize_) { + return status; + } + + IOSTATS_TIMER_GUARD(allocate_nanos); + status = PreallocateInternal(spaceToReserve); + if (status.ok()) { + reservedsize_ = spaceToReserve; + } + return status; +} + + +//////////////////////////////////////////////////////////////////////////////// +/// WinWritableFile + +WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, + size_t alignment, size_t /* capacity */, + const EnvOptions& options) + : WinFileData(fname, hFile, options.use_direct_writes), + WinWritableImpl(this, alignment) { + assert(!options.use_mmap_writes); +} + +WinWritableFile::~WinWritableFile() { +} + +// Indicates if the class makes use of direct I/O +bool WinWritableFile::use_direct_io() const { return WinFileData::use_direct_io(); } + +size_t WinWritableFile::GetRequiredBufferAlignment() const { + return static_cast<size_t>(GetAlignement()); +} + +Status WinWritableFile::Append(const Slice& data) { + return AppendImpl(data); +} + +Status WinWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { + return PositionedAppendImpl(data, offset); +} + +// Need to implement this so the file is truncated correctly +// when buffered and unbuffered mode +Status WinWritableFile::Truncate(uint64_t size) { + return TruncateImpl(size); +} + +Status WinWritableFile::Close() { + return CloseImpl(); +} + + // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter +Status WinWritableFile::Flush() { + return Status::OK(); +} + +Status WinWritableFile::Sync() { + return SyncImpl(); +} + +Status WinWritableFile::Fsync() { return SyncImpl(); } + +bool WinWritableFile::IsSyncThreadSafe() const { return true; } + +uint64_t WinWritableFile::GetFileSize() { + return GetFileNextWriteOffset(); +} + +Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) { + return AllocateImpl(offset, len); +} + +size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(GetFileHandle(), id, max_size); +} + +///////////////////////////////////////////////////////////////////////// +/// WinRandomRWFile + +WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile, + size_t alignment, const EnvOptions& options) + : WinFileData(fname, hFile, + options.use_direct_reads && options.use_direct_writes), + WinRandomAccessImpl(this, alignment, options), + WinWritableImpl(this, alignment) {} + +bool WinRandomRWFile::use_direct_io() const { return WinFileData::use_direct_io(); } + +size_t WinRandomRWFile::GetRequiredBufferAlignment() const { + return static_cast<size_t>(GetAlignement()); +} + +Status WinRandomRWFile::Write(uint64_t offset, const Slice & data) { + return PositionedAppendImpl(data, offset); +} + +Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + return ReadImpl(offset, n, result, scratch); +} + +Status WinRandomRWFile::Flush() { + return Status::OK(); +} + +Status WinRandomRWFile::Sync() { + return SyncImpl(); +} + +Status WinRandomRWFile::Close() { + return CloseImpl(); +} + +////////////////////////////////////////////////////////////////////////// +/// WinMemoryMappedBufer +WinMemoryMappedBuffer::~WinMemoryMappedBuffer() { + BOOL ret = FALSE; + if (base_ != nullptr) { + ret = ::UnmapViewOfFile(base_); + assert(ret); + base_ = nullptr; + } + if (map_handle_ != NULL && map_handle_ != INVALID_HANDLE_VALUE) { + ret = ::CloseHandle(map_handle_); + assert(ret); + map_handle_ = NULL; + } + if (file_handle_ != NULL && file_handle_ != INVALID_HANDLE_VALUE) { + ret = ::CloseHandle(file_handle_); + assert(ret); + file_handle_ = NULL; + } +} + +////////////////////////////////////////////////////////////////////////// +/// WinDirectory + +Status WinDirectory::Fsync() { return Status::OK(); } + +size_t WinDirectory::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(handle_, id, max_size); +} +////////////////////////////////////////////////////////////////////////// +/// WinFileLock + +WinFileLock::~WinFileLock() { + BOOL ret __attribute__((__unused__)); + ret = ::CloseHandle(hFile_); + assert(ret); +} + +} +} diff --git a/src/rocksdb/port/win/io_win.h b/src/rocksdb/port/win/io_win.h new file mode 100644 index 00000000..1c9d803b --- /dev/null +++ b/src/rocksdb/port/win/io_win.h @@ -0,0 +1,457 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include <stdint.h> +#include <mutex> +#include <string> + +#include "rocksdb/status.h" +#include "rocksdb/env.h" +#include "util/aligned_buffer.h" + +#include <windows.h> + + +namespace rocksdb { +namespace port { + +std::string GetWindowsErrSz(DWORD err); + +inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { + return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) + ? Status::NoSpace(context, GetWindowsErrSz(err)) + : ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND)) + ? Status::PathNotFound(context, GetWindowsErrSz(err)) + : Status::IOError(context, GetWindowsErrSz(err)); +} + +inline Status IOErrorFromLastWindowsError(const std::string& context) { + return IOErrorFromWindowsError(context, GetLastError()); +} + +inline Status IOError(const std::string& context, int err_number) { + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); +} + +class WinFileData; + +Status pwrite(const WinFileData* file_data, const Slice& data, + uint64_t offset, size_t& bytes_written); + +Status pread(const WinFileData* file_data, char* src, size_t num_bytes, + uint64_t offset, size_t& bytes_read); + +Status fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size); + +Status ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize); + +size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size); + +class WinFileData { + protected: + const std::string filename_; + HANDLE hFile_; + // If true, the I/O issued would be direct I/O which the buffer + // will need to be aligned (not sure there is a guarantee that the buffer + // passed in is aligned). + const bool use_direct_io_; + + public: + // We want this class be usable both for inheritance (prive + // or protected) and for containment so __ctor and __dtor public + WinFileData(const std::string& filename, HANDLE hFile, bool direct_io) + : filename_(filename), hFile_(hFile), use_direct_io_(direct_io) {} + + virtual ~WinFileData() { this->CloseFile(); } + + bool CloseFile() { + bool result = true; + + if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { + result = ::CloseHandle(hFile_); + assert(result); + hFile_ = NULL; + } + return result; + } + + const std::string& GetName() const { return filename_; } + + HANDLE GetFileHandle() const { return hFile_; } + + bool use_direct_io() const { return use_direct_io_; } + + WinFileData(const WinFileData&) = delete; + WinFileData& operator=(const WinFileData&) = delete; +}; + +class WinSequentialFile : protected WinFileData, public SequentialFile { + + // Override for behavior change when creating a custom env + virtual Status PositionedReadInternal(char* src, size_t numBytes, + uint64_t offset, size_t& bytes_read) const; + +public: + WinSequentialFile(const std::string& fname, HANDLE f, + const EnvOptions& options); + + ~WinSequentialFile(); + + WinSequentialFile(const WinSequentialFile&) = delete; + WinSequentialFile& operator=(const WinSequentialFile&) = delete; + + virtual Status Read(size_t n, Slice* result, char* scratch) override; + virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override; + + virtual Status Skip(uint64_t n) override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual bool use_direct_io() const override { return WinFileData::use_direct_io(); } +}; + +// mmap() based random-access +class WinMmapReadableFile : private WinFileData, public RandomAccessFile { + HANDLE hMap_; + + const void* mapped_region_; + const size_t length_; + + public: + // mapped_region_[0,length-1] contains the mmapped contents of the file. + WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, + const void* mapped_region, size_t length); + + ~WinMmapReadableFile(); + + WinMmapReadableFile(const WinMmapReadableFile&) = delete; + WinMmapReadableFile& operator=(const WinMmapReadableFile&) = delete; + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +// We preallocate and use memcpy to append new +// data to the file. This is safe since we either properly close the +// file before reading from it, or for log files, the reading code +// knows enough to skip zero suffixes. +class WinMmapFile : private WinFileData, public WritableFile { + private: + HANDLE hMap_; + + const size_t page_size_; // We flush the mapping view in page_size + // increments. We may decide if this is a memory + // page size or SSD page size + const size_t + allocation_granularity_; // View must start at such a granularity + + size_t reserved_size_; // Preallocated size + + size_t mapping_size_; // The max size of the mapping object + // we want to guess the final file size to minimize the remapping + size_t view_size_; // How much memory to map into a view at a time + + char* mapped_begin_; // Must begin at the file offset that is aligned with + // allocation_granularity_ + char* mapped_end_; + char* dst_; // Where to write next (in range [mapped_begin_,mapped_end_]) + char* last_sync_; // Where have we synced up to + + uint64_t file_offset_; // Offset of mapped_begin_ in file + + // Do we have unsynced writes? + bool pending_sync_; + + // Can only truncate or reserve to a sector size aligned if + // used on files that are opened with Unbuffered I/O + Status TruncateFile(uint64_t toSize); + + Status UnmapCurrentRegion(); + + Status MapNewRegion(); + + virtual Status PreallocateInternal(uint64_t spaceToReserve); + + public: + WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, + size_t allocation_granularity, const EnvOptions& options); + + ~WinMmapFile(); + + WinMmapFile(const WinMmapFile&) = delete; + WinMmapFile& operator=(const WinMmapFile&) = delete; + + virtual Status Append(const Slice& data) override; + + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override; + + virtual Status Close() override; + + virtual Status Flush() override; + + // Flush only data + virtual Status Sync() override; + + /** + * Flush data as well as metadata to stable storage. + */ + virtual Status Fsync() override; + + /** + * Get the size of valid data in the file. This will not match the + * size that is returned from the filesystem because we use mmap + * to extend file by map_size every time. + */ + virtual uint64_t GetFileSize() override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual Status Allocate(uint64_t offset, uint64_t len) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +class WinRandomAccessImpl { + protected: + WinFileData* file_base_; + size_t alignment_; + + // Override for behavior change when creating a custom env + virtual Status PositionedReadInternal(char* src, size_t numBytes, + uint64_t offset, size_t& bytes_read) const; + + WinRandomAccessImpl(WinFileData* file_base, size_t alignment, + const EnvOptions& options); + + virtual ~WinRandomAccessImpl() {} + + Status ReadImpl(uint64_t offset, size_t n, Slice* result, + char* scratch) const; + + size_t GetAlignment() const { return alignment_; } + + public: + + WinRandomAccessImpl(const WinRandomAccessImpl&) = delete; + WinRandomAccessImpl& operator=(const WinRandomAccessImpl&) = delete; +}; + +// pread() based random-access +class WinRandomAccessFile + : private WinFileData, + protected WinRandomAccessImpl, // Want to be able to override + // PositionedReadInternal + public RandomAccessFile { + public: + WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, + const EnvOptions& options); + + ~WinRandomAccessFile(); + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; + + virtual bool use_direct_io() const override { return WinFileData::use_direct_io(); } + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual size_t GetRequiredBufferAlignment() const override; +}; + +// This is a sequential write class. It has been mimicked (as others) after +// the original Posix class. We add support for unbuffered I/O on windows as +// well +// we utilize the original buffer as an alignment buffer to write directly to +// file with no buffering. +// No buffering requires that the provided buffer is aligned to the physical +// sector size (SSD page size) and +// that all SetFilePointer() operations to occur with such an alignment. +// We thus always write in sector/page size increments to the drive and leave +// the tail for the next write OR for Close() at which point we pad with zeros. +// No padding is required for +// buffered access. +class WinWritableImpl { + protected: + WinFileData* file_data_; + const uint64_t alignment_; + uint64_t next_write_offset_; // Needed because Windows does not support O_APPEND + uint64_t reservedsize_; // how far we have reserved space + + virtual Status PreallocateInternal(uint64_t spaceToReserve); + + WinWritableImpl(WinFileData* file_data, size_t alignment); + + ~WinWritableImpl() {} + + uint64_t GetAlignement() const { return alignment_; } + + Status AppendImpl(const Slice& data); + + // Requires that the data is aligned as specified by + // GetRequiredBufferAlignment() + Status PositionedAppendImpl(const Slice& data, uint64_t offset); + + Status TruncateImpl(uint64_t size); + + Status CloseImpl(); + + Status SyncImpl(); + + uint64_t GetFileNextWriteOffset() { + // Double accounting now here with WritableFileWriter + // and this size will be wrong when unbuffered access is used + // but tests implement their own writable files and do not use + // WritableFileWrapper + // so we need to squeeze a square peg through + // a round hole here. + return next_write_offset_; + } + + Status AllocateImpl(uint64_t offset, uint64_t len); + + public: + WinWritableImpl(const WinWritableImpl&) = delete; + WinWritableImpl& operator=(const WinWritableImpl&) = delete; +}; + +class WinWritableFile : private WinFileData, + protected WinWritableImpl, + public WritableFile { + public: + WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, + size_t capacity, const EnvOptions& options); + + ~WinWritableFile(); + + virtual Status Append(const Slice& data) override; + + // Requires that the data is aligned as specified by + // GetRequiredBufferAlignment() + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override; + + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode + virtual Status Truncate(uint64_t size) override; + + virtual Status Close() override; + + // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter + virtual Status Flush() override; + + virtual Status Sync() override; + + virtual Status Fsync() override; + + virtual bool IsSyncThreadSafe() const override; + + // Indicates if the class makes use of direct I/O + // Use PositionedAppend + virtual bool use_direct_io() const override; + + virtual size_t GetRequiredBufferAlignment() const override; + + virtual uint64_t GetFileSize() override; + + virtual Status Allocate(uint64_t offset, uint64_t len) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +class WinRandomRWFile : private WinFileData, + protected WinRandomAccessImpl, + protected WinWritableImpl, + public RandomRWFile { + public: + WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment, + const EnvOptions& options); + + ~WinRandomRWFile() {} + + // Indicates if the class makes use of direct I/O + // If false you must pass aligned buffer to Write() + virtual bool use_direct_io() const override; + + // Use the returned alignment value to allocate aligned + // buffer for Write() when use_direct_io() returns true + virtual size_t GetRequiredBufferAlignment() const override; + + // Write bytes in `data` at offset `offset`, Returns Status::OK() on success. + // Pass aligned buffer when use_direct_io() returns true. + virtual Status Write(uint64_t offset, const Slice& data) override; + + // Read up to `n` bytes starting from offset `offset` and store them in + // result, provided `scratch` size should be at least `n`. + // Returns Status::OK() on success. + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status Flush() override; + + virtual Status Sync() override; + + virtual Status Fsync() { return Sync(); } + + virtual Status Close() override; +}; + +class WinMemoryMappedBuffer : public MemoryMappedFileBuffer { +private: + HANDLE file_handle_; + HANDLE map_handle_; +public: + WinMemoryMappedBuffer(HANDLE file_handle, HANDLE map_handle, void* base, size_t size) : + MemoryMappedFileBuffer(base, size), + file_handle_(file_handle), + map_handle_(map_handle) {} + ~WinMemoryMappedBuffer() override; +}; + +class WinDirectory : public Directory { + HANDLE handle_; + public: + explicit WinDirectory(HANDLE h) noexcept : handle_(h) { + assert(handle_ != INVALID_HANDLE_VALUE); + } + ~WinDirectory() { + ::CloseHandle(handle_); + } + virtual Status Fsync() override; + + size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +class WinFileLock : public FileLock { + public: + explicit WinFileLock(HANDLE hFile) : hFile_(hFile) { + assert(hFile != NULL); + assert(hFile != INVALID_HANDLE_VALUE); + } + + ~WinFileLock(); + + private: + HANDLE hFile_; +}; +} +} diff --git a/src/rocksdb/port/win/port_win.cc b/src/rocksdb/port/win/port_win.cc new file mode 100644 index 00000000..03ba6ef4 --- /dev/null +++ b/src/rocksdb/port/win/port_win.cc @@ -0,0 +1,266 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#if !defined(OS_WIN) && !defined(WIN32) && !defined(_WIN32) +#error Windows Specific Code +#endif + +#include "port/win/port_win.h" + +#include <io.h> +#include "port/port_dirent.h" +#include "port/sys_time.h" + +#include <cstdlib> +#include <stdio.h> +#include <assert.h> +#include <string.h> + +#include <memory> +#include <exception> +#include <chrono> + +#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES +// utf8 <-> utf16 +#include <string> +#include <locale> +#include <codecvt> +#endif + +#include "util/logging.h" + +namespace rocksdb { + +extern const bool kDefaultToAdaptiveMutex = false; + +namespace port { + +#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES +std::string utf16_to_utf8(const std::wstring& utf16) { + std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>,wchar_t> convert; + return convert.to_bytes(utf16); +} + +std::wstring utf8_to_utf16(const std::string& utf8) { + std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> converter; + return converter.from_bytes(utf8); +} +#endif + +void gettimeofday(struct timeval* tv, struct timezone* /* tz */) { + using namespace std::chrono; + + microseconds usNow( + duration_cast<microseconds>(system_clock::now().time_since_epoch())); + + seconds secNow(duration_cast<seconds>(usNow)); + + tv->tv_sec = static_cast<long>(secNow.count()); + tv->tv_usec = static_cast<long>(usNow.count() - + duration_cast<microseconds>(secNow).count()); +} + +Mutex::~Mutex() {} + +CondVar::~CondVar() {} + +void CondVar::Wait() { + // Caller must ensure that mutex is held prior to calling this method + std::unique_lock<std::mutex> lk(mu_->getLock(), std::adopt_lock); +#ifndef NDEBUG + mu_->locked_ = false; +#endif + cv_.wait(lk); +#ifndef NDEBUG + mu_->locked_ = true; +#endif + // Release ownership of the lock as we don't want it to be unlocked when + // it goes out of scope (as we adopted the lock and didn't lock it ourselves) + lk.release(); +} + +bool CondVar::TimedWait(uint64_t abs_time_us) { + + using namespace std::chrono; + + // MSVC++ library implements wait_until in terms of wait_for so + // we need to convert absolute wait into relative wait. + microseconds usAbsTime(abs_time_us); + + microseconds usNow( + duration_cast<microseconds>(system_clock::now().time_since_epoch())); + microseconds relTimeUs = + (usAbsTime > usNow) ? (usAbsTime - usNow) : microseconds::zero(); + + // Caller must ensure that mutex is held prior to calling this method + std::unique_lock<std::mutex> lk(mu_->getLock(), std::adopt_lock); +#ifndef NDEBUG + mu_->locked_ = false; +#endif + std::cv_status cvStatus = cv_.wait_for(lk, relTimeUs); +#ifndef NDEBUG + mu_->locked_ = true; +#endif + // Release ownership of the lock as we don't want it to be unlocked when + // it goes out of scope (as we adopted the lock and didn't lock it ourselves) + lk.release(); + + if (cvStatus == std::cv_status::timeout) { + return true; + } + + return false; +} + +void CondVar::Signal() { cv_.notify_one(); } + +void CondVar::SignalAll() { cv_.notify_all(); } + +int PhysicalCoreID() { return GetCurrentProcessorNumber(); } + +void InitOnce(OnceType* once, void (*initializer)()) { + std::call_once(once->flag_, initializer); +} + +// Private structure, exposed only by pointer +struct DIR { + HANDLE handle_; + bool firstread_; + RX_WIN32_FIND_DATA data_; + dirent entry_; + + DIR() : handle_(INVALID_HANDLE_VALUE), + firstread_(true) {} + + DIR(const DIR&) = delete; + DIR& operator=(const DIR&) = delete; + + ~DIR() { + if (INVALID_HANDLE_VALUE != handle_) { + ::FindClose(handle_); + } + } +}; + +DIR* opendir(const char* name) { + if (!name || *name == 0) { + errno = ENOENT; + return nullptr; + } + + std::string pattern(name); + pattern.append("\\").append("*"); + + std::unique_ptr<DIR> dir(new DIR); + + dir->handle_ = RX_FindFirstFileEx(RX_FN(pattern).c_str(), + FindExInfoBasic, // Do not want alternative name + &dir->data_, + FindExSearchNameMatch, + NULL, // lpSearchFilter + 0); + + if (dir->handle_ == INVALID_HANDLE_VALUE) { + return nullptr; + } + + RX_FILESTRING x(dir->data_.cFileName, RX_FNLEN(dir->data_.cFileName)); + strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), + FN_TO_RX(x).c_str()); + + return dir.release(); +} + +struct dirent* readdir(DIR* dirp) { + if (!dirp || dirp->handle_ == INVALID_HANDLE_VALUE) { + errno = EBADF; + return nullptr; + } + + if (dirp->firstread_) { + dirp->firstread_ = false; + return &dirp->entry_; + } + + auto ret = RX_FindNextFile(dirp->handle_, &dirp->data_); + + if (ret == 0) { + return nullptr; + } + + RX_FILESTRING x(dirp->data_.cFileName, RX_FNLEN(dirp->data_.cFileName)); + strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name), + FN_TO_RX(x).c_str()); + + return &dirp->entry_; +} + +int closedir(DIR* dirp) { + delete dirp; + return 0; +} + +int truncate(const char* path, int64_t length) { + if (path == nullptr) { + errno = EFAULT; + return -1; + } + return rocksdb::port::Truncate(path, length); +} + +int Truncate(std::string path, int64_t len) { + + if (len < 0) { + errno = EINVAL; + return -1; + } + + HANDLE hFile = + RX_CreateFile(RX_FN(path).c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, // Security attrs + OPEN_EXISTING, // Truncate existing file only + FILE_ATTRIBUTE_NORMAL, NULL); + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + if (lastError == ERROR_FILE_NOT_FOUND) { + errno = ENOENT; + } else if (lastError == ERROR_ACCESS_DENIED) { + errno = EACCES; + } else { + errno = EIO; + } + return -1; + } + + int result = 0; + FILE_END_OF_FILE_INFO end_of_file; + end_of_file.EndOfFile.QuadPart = len; + + if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file, + sizeof(FILE_END_OF_FILE_INFO))) { + errno = EIO; + result = -1; + } + + CloseHandle(hFile); + return result; +} + +void Crash(const std::string& srcfile, int srcline) { + fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline); + fflush(stdout); + abort(); +} + +int GetMaxOpenFiles() { return -1; } + +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/win/port_win.h b/src/rocksdb/port/win/port_win.h new file mode 100644 index 00000000..de41cdc7 --- /dev/null +++ b/src/rocksdb/port/win/port_win.h @@ -0,0 +1,399 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// See port_example.h for documentation for the following types/functions. + +#pragma once + +// Always want minimum headers +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif + +// Assume that for everywhere +#undef PLATFORM_IS_LITTLE_ENDIAN +#define PLATFORM_IS_LITTLE_ENDIAN true + +#include <windows.h> +#include <string> +#include <string.h> +#include <mutex> +#include <limits> +#include <condition_variable> +#include <malloc.h> +#include <intrin.h> + +#include <stdint.h> + +#include "port/win/win_thread.h" + +#include "rocksdb/options.h" + +#undef min +#undef max +#undef DeleteFile +#undef GetCurrentTime + + +#ifndef strcasecmp +#define strcasecmp _stricmp +#endif + +#undef GetCurrentTime +#undef DeleteFile + +#ifndef _SSIZE_T_DEFINED +typedef SSIZE_T ssize_t; +#endif + +// size_t printf formatting named in the manner of C99 standard formatting +// strings such as PRIu64 +// in fact, we could use that one +#ifndef ROCKSDB_PRIszt +#define ROCKSDB_PRIszt "Iu" +#endif + +#ifdef _MSC_VER +#define __attribute__(A) + +// Thread local storage on Linux +// There is thread_local in C++11 +#ifndef __thread +#define __thread __declspec(thread) +#endif + +#endif + +#ifndef PLATFORM_IS_LITTLE_ENDIAN +#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) +#endif + +namespace rocksdb { + +#define PREFETCH(addr, rw, locality) + +extern const bool kDefaultToAdaptiveMutex; + +namespace port { + +// VS < 2015 +#if defined(_MSC_VER) && (_MSC_VER < 1900) + +// VS 15 has snprintf +#define snprintf _snprintf + +#define ROCKSDB_NOEXCEPT +// std::numeric_limits<size_t>::max() is not constexpr just yet +// therefore, use the same limits + +// For use at db/file_indexer.h kLevelMaxIndex +const uint32_t kMaxUint32 = UINT32_MAX; +const int kMaxInt32 = INT32_MAX; +const int kMinInt32 = INT32_MIN; +const int64_t kMaxInt64 = INT64_MAX; +const int64_t kMinInt64 = INT64_MIN; +const uint64_t kMaxUint64 = UINT64_MAX; + +#ifdef _WIN64 +const size_t kMaxSizet = UINT64_MAX; +#else +const size_t kMaxSizet = UINT_MAX; +#endif + +#else // VS >= 2015 or MinGW + +#define ROCKSDB_NOEXCEPT noexcept + +// For use at db/file_indexer.h kLevelMaxIndex +const uint32_t kMaxUint32 = std::numeric_limits<uint32_t>::max(); +const int kMaxInt32 = std::numeric_limits<int>::max(); +const int kMinInt32 = std::numeric_limits<int>::min(); +const uint64_t kMaxUint64 = std::numeric_limits<uint64_t>::max(); +const int64_t kMaxInt64 = std::numeric_limits<int64_t>::max(); +const int64_t kMinInt64 = std::numeric_limits<int64_t>::min(); + +const size_t kMaxSizet = std::numeric_limits<size_t>::max(); + +#endif //_MSC_VER + +const bool kLittleEndian = true; + +class CondVar; + +class Mutex { + public: + + /* implicit */ Mutex(bool adaptive = kDefaultToAdaptiveMutex) +#ifndef NDEBUG + : locked_(false) +#endif + { } + + ~Mutex(); + + void Lock() { + mutex_.lock(); +#ifndef NDEBUG + locked_ = true; +#endif + } + + void Unlock() { +#ifndef NDEBUG + locked_ = false; +#endif + mutex_.unlock(); + } + + // this will assert if the mutex is not locked + // it does NOT verify that mutex is held by a calling thread + void AssertHeld() { +#ifndef NDEBUG + assert(locked_); +#endif + } + + // Mutex is move only with lock ownership transfer + Mutex(const Mutex&) = delete; + void operator=(const Mutex&) = delete; + + private: + + friend class CondVar; + + std::mutex& getLock() { + return mutex_; + } + + std::mutex mutex_; +#ifndef NDEBUG + bool locked_; +#endif +}; + +class RWMutex { + public: + RWMutex() { InitializeSRWLock(&srwLock_); } + + void ReadLock() { AcquireSRWLockShared(&srwLock_); } + + void WriteLock() { AcquireSRWLockExclusive(&srwLock_); } + + void ReadUnlock() { ReleaseSRWLockShared(&srwLock_); } + + void WriteUnlock() { ReleaseSRWLockExclusive(&srwLock_); } + + // Empty as in POSIX + void AssertHeld() {} + + private: + SRWLOCK srwLock_; + // No copying allowed + RWMutex(const RWMutex&); + void operator=(const RWMutex&); +}; + +class CondVar { + public: + explicit CondVar(Mutex* mu) : mu_(mu) { + } + + ~CondVar(); + void Wait(); + bool TimedWait(uint64_t expiration_time); + void Signal(); + void SignalAll(); + + // Condition var is not copy/move constructible + CondVar(const CondVar&) = delete; + CondVar& operator=(const CondVar&) = delete; + + CondVar(CondVar&&) = delete; + CondVar& operator=(CondVar&&) = delete; + + private: + std::condition_variable cv_; + Mutex* mu_; +}; + +// Wrapper around the platform efficient +// or otherwise preferrable implementation +using Thread = WindowsThread; + +// OnceInit type helps emulate +// Posix semantics with initialization +// adopted in the project +struct OnceType { + + struct Init {}; + + OnceType() {} + OnceType(const Init&) {} + OnceType(const OnceType&) = delete; + OnceType& operator=(const OnceType&) = delete; + + std::once_flag flag_; +}; + +#define LEVELDB_ONCE_INIT port::OnceType::Init() +extern void InitOnce(OnceType* once, void (*initializer)()); + +#ifndef CACHE_LINE_SIZE +#define CACHE_LINE_SIZE 64U +#endif + +#ifdef ROCKSDB_JEMALLOC +// Separate inlines so they can be replaced if needed +void* jemalloc_aligned_alloc(size_t size, size_t alignment) ROCKSDB_NOEXCEPT; +void jemalloc_aligned_free(void* p) ROCKSDB_NOEXCEPT; +#endif + +inline void *cacheline_aligned_alloc(size_t size) { +#ifdef ROCKSDB_JEMALLOC + return jemalloc_aligned_alloc(size, CACHE_LINE_SIZE); +#else + return _aligned_malloc(size, CACHE_LINE_SIZE); +#endif +} + +inline void cacheline_aligned_free(void *memblock) { +#ifdef ROCKSDB_JEMALLOC + jemalloc_aligned_free(memblock); +#else + _aligned_free(memblock); +#endif +} + +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=52991 for MINGW32 +// could not be worked around with by -mno-ms-bitfields +#ifndef __MINGW32__ +#define ALIGN_AS(n) __declspec(align(n)) +#else +#define ALIGN_AS(n) +#endif + +static inline void AsmVolatilePause() { +#if defined(_M_IX86) || defined(_M_X64) + YieldProcessor(); +#endif + // it would be nice to get "wfe" on ARM here +} + +extern int PhysicalCoreID(); + +// For Thread Local Storage abstraction +typedef DWORD pthread_key_t; + +inline int pthread_key_create(pthread_key_t* key, void (*destructor)(void*)) { + // Not used + (void)destructor; + + pthread_key_t k = TlsAlloc(); + if (TLS_OUT_OF_INDEXES == k) { + return ENOMEM; + } + + *key = k; + return 0; +} + +inline int pthread_key_delete(pthread_key_t key) { + if (!TlsFree(key)) { + return EINVAL; + } + return 0; +} + +inline int pthread_setspecific(pthread_key_t key, const void* value) { + if (!TlsSetValue(key, const_cast<void*>(value))) { + return ENOMEM; + } + return 0; +} + +inline void* pthread_getspecific(pthread_key_t key) { + void* result = TlsGetValue(key); + if (!result) { + if (GetLastError() != ERROR_SUCCESS) { + errno = EINVAL; + } else { + errno = NOERROR; + } + } + return result; +} + +// UNIX equiv although errno numbers will be off +// using C-runtime to implement. Note, this does not +// feel space with zeros in case the file is extended. +int truncate(const char* path, int64_t length); +int Truncate(std::string path, int64_t length); +void Crash(const std::string& srcfile, int srcline); +extern int GetMaxOpenFiles(); +std::string utf16_to_utf8(const std::wstring& utf16); +std::wstring utf8_to_utf16(const std::string& utf8); + +} // namespace port + + +#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES + +#define RX_FILESTRING std::wstring +#define RX_FN(a) rocksdb::port::utf8_to_utf16(a) +#define FN_TO_RX(a) rocksdb::port::utf16_to_utf8(a) +#define RX_FNLEN(a) ::wcslen(a) + +#define RX_DeleteFile DeleteFileW +#define RX_CreateFile CreateFileW +#define RX_CreateFileMapping CreateFileMappingW +#define RX_GetFileAttributesEx GetFileAttributesExW +#define RX_FindFirstFileEx FindFirstFileExW +#define RX_FindNextFile FindNextFileW +#define RX_WIN32_FIND_DATA WIN32_FIND_DATAW +#define RX_CreateDirectory CreateDirectoryW +#define RX_RemoveDirectory RemoveDirectoryW +#define RX_GetFileAttributesEx GetFileAttributesExW +#define RX_MoveFileEx MoveFileExW +#define RX_CreateHardLink CreateHardLinkW +#define RX_PathIsRelative PathIsRelativeW +#define RX_GetCurrentDirectory GetCurrentDirectoryW + +#else + +#define RX_FILESTRING std::string +#define RX_FN(a) a +#define FN_TO_RX(a) a +#define RX_FNLEN(a) strlen(a) + +#define RX_DeleteFile DeleteFileA +#define RX_CreateFile CreateFileA +#define RX_CreateFileMapping CreateFileMappingA +#define RX_GetFileAttributesEx GetFileAttributesExA +#define RX_FindFirstFileEx FindFirstFileExA +#define RX_CreateDirectory CreateDirectoryA +#define RX_FindNextFile FindNextFileA +#define RX_WIN32_FIND_DATA WIN32_FIND_DATA +#define RX_CreateDirectory CreateDirectoryA +#define RX_RemoveDirectory RemoveDirectoryA +#define RX_GetFileAttributesEx GetFileAttributesExA +#define RX_MoveFileEx MoveFileExA +#define RX_CreateHardLink CreateHardLinkA +#define RX_PathIsRelative PathIsRelativeA +#define RX_GetCurrentDirectory GetCurrentDirectoryA + +#endif + +using port::pthread_key_t; +using port::pthread_key_create; +using port::pthread_key_delete; +using port::pthread_setspecific; +using port::pthread_getspecific; +using port::truncate; + +} // namespace rocksdb diff --git a/src/rocksdb/port/win/win_jemalloc.cc b/src/rocksdb/port/win/win_jemalloc.cc new file mode 100644 index 00000000..3268a56a --- /dev/null +++ b/src/rocksdb/port/win/win_jemalloc.cc @@ -0,0 +1,75 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_JEMALLOC +# error This file can only be part of jemalloc aware build +#endif + +#include <stdexcept> +#include "jemalloc/jemalloc.h" +#include "port/win/port_win.h" + +#if defined(ZSTD) && defined(ZSTD_STATIC_LINKING_ONLY) +#include <zstd.h> +#if (ZSTD_VERSION_NUMBER >= 500) +namespace rocksdb { +namespace port { +void* JemallocAllocateForZSTD(void* /* opaque */, size_t size) { + return je_malloc(size); +} +void JemallocDeallocateForZSTD(void* /* opaque */, void* address) { + je_free(address); +} +ZSTD_customMem GetJeZstdAllocationOverrides() { + return {JemallocAllocateForZSTD, JemallocDeallocateForZSTD, nullptr}; +} +} // namespace port +} // namespace rocksdb +#endif // (ZSTD_VERSION_NUMBER >= 500) +#endif // defined(ZSTD) defined(ZSTD_STATIC_LINKING_ONLY) + +// Global operators to be replaced by a linker when this file is +// a part of the build + +namespace rocksdb { +namespace port { +void* jemalloc_aligned_alloc(size_t size, size_t alignment) ROCKSDB_NOEXCEPT { + return je_aligned_alloc(alignment, size); +} +void jemalloc_aligned_free(void* p) ROCKSDB_NOEXCEPT { je_free(p); } +} // namespace port +} // namespace rocksdb + +void* operator new(size_t size) { + void* p = je_malloc(size); + if (!p) { + throw std::bad_alloc(); + } + return p; +} + +void* operator new[](size_t size) { + void* p = je_malloc(size); + if (!p) { + throw std::bad_alloc(); + } + return p; +} + +void operator delete(void* p) { + if (p) { + je_free(p); + } +} + +void operator delete[](void* p) { + if (p) { + je_free(p); + } +} diff --git a/src/rocksdb/port/win/win_logger.cc b/src/rocksdb/port/win/win_logger.cc new file mode 100644 index 00000000..af722d90 --- /dev/null +++ b/src/rocksdb/port/win/win_logger.cc @@ -0,0 +1,192 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#include "port/win/win_logger.h" +#include "port/win/io_win.h" + +#include <algorithm> +#include <stdio.h> +#include <time.h> +#include <fcntl.h> +#include <atomic> + +#include "rocksdb/env.h" + +#include "monitoring/iostats_context_imp.h" +#include "port/sys_time.h" + +namespace rocksdb { + +namespace port { + +WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, + const InfoLogLevel log_level) + : Logger(log_level), + file_(file), + gettid_(gettid), + log_size_(0), + last_flush_micros_(0), + env_(env), + flush_pending_(false) { + assert(file_ != NULL); + assert(file_ != INVALID_HANDLE_VALUE); +} + +void WinLogger::DebugWriter(const char* str, int len) { + assert(file_ != INVALID_HANDLE_VALUE); + DWORD bytesWritten = 0; + BOOL ret = WriteFile(file_, str, len, &bytesWritten, NULL); + if (ret == FALSE) { + std::string errSz = GetWindowsErrSz(GetLastError()); + fprintf(stderr, errSz.c_str()); + } +} + +WinLogger::~WinLogger() { + CloseInternal(); +} + +Status WinLogger::CloseImpl() { + return CloseInternal(); +} + +Status WinLogger::CloseInternal() { + Status s; + if (INVALID_HANDLE_VALUE != file_) { + BOOL ret = FlushFileBuffers(file_); + if (ret == 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to flush LOG on Close() ", + lastError); + } + ret = CloseHandle(file_); + // On error the return value is zero + if (ret == 0 && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to flush LOG on Close() ", + lastError); + } + file_ = INVALID_HANDLE_VALUE; + closed_ = true; + } + return s; +} + +void WinLogger::Flush() { + assert(file_ != INVALID_HANDLE_VALUE); + if (flush_pending_) { + flush_pending_ = false; + // With Windows API writes go to OS buffers directly so no fflush needed + // unlike with C runtime API. We don't flush all the way to disk + // for perf reasons. + } + + last_flush_micros_ = env_->NowMicros(); +} + +void WinLogger::Logv(const char* format, va_list ap) { + IOSTATS_TIMER_GUARD(logger_nanos); + assert(file_ != INVALID_HANDLE_VALUE); + + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + std::unique_ptr<char[]> largeBuffer; + for (int iter = 0; iter < 2; ++iter) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + largeBuffer.reset(new char[bufsize]); + base = largeBuffer.get(); + } + + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_s(&t, &seconds); + p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec), + static_cast<long long unsigned int>(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + int done = vsnprintf(p, limit - p, format, backup_ap); + if (done > 0) { + p += done; + } else { + continue; + } + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + const size_t write_size = p - base; + + DWORD bytesWritten = 0; + BOOL ret = WriteFile(file_, base, static_cast<DWORD>(write_size), + &bytesWritten, NULL); + if (ret == FALSE) { + std::string errSz = GetWindowsErrSz(GetLastError()); + fprintf(stderr, errSz.c_str()); + } + + flush_pending_ = true; + assert((bytesWritten == write_size) || (ret == FALSE)); + if (bytesWritten > 0) { + log_size_ += write_size; + } + + uint64_t now_micros = + static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + flush_pending_ = false; + // With Windows API writes go to OS buffers directly so no fflush needed + // unlike with C runtime API. We don't flush all the way to disk + // for perf reasons. + last_flush_micros_ = now_micros; + } + break; + } +} + +size_t WinLogger::GetLogFileSize() const { return log_size_; } + +} + +} // namespace rocksdb diff --git a/src/rocksdb/port/win/win_logger.h b/src/rocksdb/port/win/win_logger.h new file mode 100644 index 00000000..0982f142 --- /dev/null +++ b/src/rocksdb/port/win/win_logger.h @@ -0,0 +1,67 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#pragma once + +#include <atomic> + +#include "rocksdb/env.h" + +#include <stdint.h> +#include <windows.h> + +namespace rocksdb { + +class Env; + +namespace port { + +class WinLogger : public rocksdb::Logger { + public: + WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, + const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL); + + virtual ~WinLogger(); + + WinLogger(const WinLogger&) = delete; + + WinLogger& operator=(const WinLogger&) = delete; + + void Flush() override; + + using rocksdb::Logger::Logv; + void Logv(const char* format, va_list ap) override; + + size_t GetLogFileSize() const override; + + void DebugWriter(const char* str, int len); + +protected: + + Status CloseImpl() override; + + private: + HANDLE file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + std::atomic_size_t log_size_; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + bool flush_pending_; + + Status CloseInternal(); + + const static uint64_t flush_every_seconds_ = 5; +}; + +} + +} // namespace rocksdb diff --git a/src/rocksdb/port/win/win_thread.cc b/src/rocksdb/port/win/win_thread.cc new file mode 100644 index 00000000..9a976e2c --- /dev/null +++ b/src/rocksdb/port/win/win_thread.cc @@ -0,0 +1,174 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/win_thread.h" + +#include <assert.h> +#include <process.h> // __beginthreadex +#include <windows.h> + +#include <stdexcept> +#include <system_error> +#include <thread> + +namespace rocksdb { +namespace port { + +struct WindowsThread::Data { + + std::function<void()> func_; + uintptr_t handle_; + + Data(std::function<void()>&& func) : + func_(std::move(func)), + handle_(0) { + } + + Data(const Data&) = delete; + Data& operator=(const Data&) = delete; + + static unsigned int __stdcall ThreadProc(void* arg); +}; + + +void WindowsThread::Init(std::function<void()>&& func) { + + data_ = std::make_shared<Data>(std::move(func)); + // We create another instance of std::shared_ptr to get an additional ref + // since we may detach and destroy this instance before the threadproc + // may start to run. We choose to allocate this additional ref on the heap + // so we do not need to synchronize and allow this thread to proceed + std::unique_ptr<std::shared_ptr<Data>> th_data(new std::shared_ptr<Data>(data_)); + + data_->handle_ = _beginthreadex(NULL, + 0, // stack size + &Data::ThreadProc, + th_data.get(), + 0, // init flag + &th_id_); + + if (data_->handle_ == 0) { + throw std::system_error(std::make_error_code( + std::errc::resource_unavailable_try_again), + "Unable to create a thread"); + } + th_data.release(); +} + +WindowsThread::WindowsThread() : + data_(nullptr), + th_id_(0) +{} + + +WindowsThread::~WindowsThread() { + // Must be joined or detached + // before destruction. + // This is the same as std::thread + if (data_) { + if (joinable()) { + assert(false); + std::terminate(); + } + data_.reset(); + } +} + +WindowsThread::WindowsThread(WindowsThread&& o) noexcept : + WindowsThread() { + *this = std::move(o); +} + +WindowsThread& WindowsThread::operator=(WindowsThread&& o) noexcept { + + if (joinable()) { + assert(false); + std::terminate(); + } + + data_ = std::move(o.data_); + + // Per spec both instances will have the same id + th_id_ = o.th_id_; + + return *this; +} + +bool WindowsThread::joinable() const { + return (data_ && data_->handle_ != 0); +} + +WindowsThread::native_handle_type WindowsThread::native_handle() const { + return reinterpret_cast<native_handle_type>(data_->handle_); +} + +unsigned WindowsThread::hardware_concurrency() { + return std::thread::hardware_concurrency(); +} + +void WindowsThread::join() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer joinable"); + } + + if (GetThreadId(GetCurrentThread()) == th_id_) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::resource_deadlock_would_occur), + "Can not join itself"); + } + + auto ret = WaitForSingleObject(reinterpret_cast<HANDLE>(data_->handle_), + INFINITE); + if (ret != WAIT_OBJECT_0) { + auto lastError = GetLastError(); + assert(false); + throw std::system_error(static_cast<int>(lastError), + std::system_category(), + "WaitForSingleObjectFailed: thread join"); + } + + BOOL rc; + rc = CloseHandle(reinterpret_cast<HANDLE>(data_->handle_)); + assert(rc != 0); + data_->handle_ = 0; +} + +bool WindowsThread::detach() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer available"); + } + + BOOL ret = CloseHandle(reinterpret_cast<HANDLE>(data_->handle_)); + data_->handle_ = 0; + + return (ret != 0); +} + +void WindowsThread::swap(WindowsThread& o) { + data_.swap(o.data_); + std::swap(th_id_, o.th_id_); +} + +unsigned int __stdcall WindowsThread::Data::ThreadProc(void* arg) { + auto ptr = reinterpret_cast<std::shared_ptr<Data>*>(arg); + std::unique_ptr<std::shared_ptr<Data>> data(ptr); + (*data)->func_(); + return 0; +} +} // namespace port +} // namespace rocksdb diff --git a/src/rocksdb/port/win/win_thread.h b/src/rocksdb/port/win/win_thread.h new file mode 100644 index 00000000..1d5b225e --- /dev/null +++ b/src/rocksdb/port/win/win_thread.h @@ -0,0 +1,121 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include <memory> +#include <functional> +#include <type_traits> + +namespace rocksdb { +namespace port { + +// This class is a replacement for std::thread +// 2 reasons we do not like std::thread: +// -- is that it dynamically allocates its internals that are automatically +// freed when the thread terminates and not on the destruction of the +// object. This makes it difficult to control the source of memory +// allocation +// - This implements Pimpl so we can easily replace the guts of the +// object in our private version if necessary. +class WindowsThread { + + struct Data; + + std::shared_ptr<Data> data_; + unsigned int th_id_; + + void Init(std::function<void()>&&); + +public: + + typedef void* native_handle_type; + + // Construct with no thread + WindowsThread(); + + // Template constructor + // + // This templated constructor accomplishes several things + // + // - Allows the class as whole to be not a template + // + // - take "universal" references to support both _lvalues and _rvalues + // + // - because this constructor is a catchall case in many respects it + // may prevent us from using both the default __ctor, the move __ctor. + // Also it may circumvent copy __ctor deletion. To work around this + // we make sure this one has at least one argument and eliminate + // it from the overload selection when WindowsThread is the first + // argument. + // + // - construct with Fx(Ax...) with a variable number of types/arguments. + // + // - Gathers together the callable object with its arguments and constructs + // a single callable entity + // + // - Makes use of std::function to convert it to a specification-template + // dependent type that both checks the signature conformance to ensure + // that all of the necessary arguments are provided and allows pimpl + // implementation. + template<class Fn, + class... Args, + class = typename std::enable_if< + !std::is_same<typename std::decay<Fn>::type, + WindowsThread>::value>::type> + explicit WindowsThread(Fn&& fx, Args&&... ax) : + WindowsThread() { + + // Use binder to create a single callable entity + auto binder = std::bind(std::forward<Fn>(fx), + std::forward<Args>(ax)...); + // Use std::function to take advantage of the type erasure + // so we can still hide implementation within pimpl + // This also makes sure that the binder signature is compliant + std::function<void()> target = binder; + + Init(std::move(target)); + } + + + ~WindowsThread(); + + WindowsThread(const WindowsThread&) = delete; + + WindowsThread& operator=(const WindowsThread&) = delete; + + WindowsThread(WindowsThread&&) noexcept; + + WindowsThread& operator=(WindowsThread&&) noexcept; + + bool joinable() const; + + unsigned int get_id() const { return th_id_; } + + native_handle_type native_handle() const; + + static unsigned hardware_concurrency(); + + void join(); + + bool detach(); + + void swap(WindowsThread&); +}; +} // namespace port +} // namespace rocksdb + +namespace std { + inline + void swap(rocksdb::port::WindowsThread& th1, + rocksdb::port::WindowsThread& th2) { + th1.swap(th2); + } +} // namespace std + diff --git a/src/rocksdb/port/win/xpress_win.cc b/src/rocksdb/port/win/xpress_win.cc new file mode 100644 index 00000000..9ab23c53 --- /dev/null +++ b/src/rocksdb/port/win/xpress_win.cc @@ -0,0 +1,226 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/xpress_win.h" +#include <windows.h> + +#include <cassert> +#include <memory> +#include <limits> +#include <iostream> + +#ifdef XPRESS + +// Put this under ifdef so windows systems w/o this +// can still build +#include <compressapi.h> + +namespace rocksdb { +namespace port { +namespace xpress { + +// Helpers +namespace { + +auto CloseCompressorFun = [](void* h) { + if (NULL != h) { + ::CloseCompressor(reinterpret_cast<COMPRESSOR_HANDLE>(h)); + } +}; + +auto CloseDecompressorFun = [](void* h) { + if (NULL != h) { + ::CloseDecompressor(reinterpret_cast<DECOMPRESSOR_HANDLE>(h)); + } +}; +} + +bool Compress(const char* input, size_t length, std::string* output) { + + assert(input != nullptr); + assert(output != nullptr); + + if (length == 0) { + output->clear(); + return true; + } + + COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr; + + COMPRESSOR_HANDLE compressor = NULL; + + BOOL success = CreateCompressor( + COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm + allocRoutinesPtr, // Optional allocation routine + &compressor); // Handle + + if (!success) { +#ifdef _DEBUG + std::cerr << "XPRESS: Failed to create Compressor LastError: " << + GetLastError() << std::endl; +#endif + return false; + } + + std::unique_ptr<void, decltype(CloseCompressorFun)> + compressorGuard(compressor, CloseCompressorFun); + + SIZE_T compressedBufferSize = 0; + + // Query compressed buffer size. + success = ::Compress( + compressor, // Compressor Handle + const_cast<char*>(input), // Input buffer + length, // Uncompressed data size + NULL, // Compressed Buffer + 0, // Compressed Buffer size + &compressedBufferSize); // Compressed Data size + + if (!success) { + + auto lastError = GetLastError(); + + if (lastError != ERROR_INSUFFICIENT_BUFFER) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to estimate compressed buffer size LastError " << + lastError << std::endl; +#endif + return false; + } + } + + assert(compressedBufferSize > 0); + + std::string result; + result.resize(compressedBufferSize); + + SIZE_T compressedDataSize = 0; + + // Compress + success = ::Compress( + compressor, // Compressor Handle + const_cast<char*>(input), // Input buffer + length, // Uncompressed data size + &result[0], // Compressed Buffer + compressedBufferSize, // Compressed Buffer size + &compressedDataSize); // Compressed Data size + + if (!success) { +#ifdef _DEBUG + std::cerr << "XPRESS: Failed to compress LastError " << + GetLastError() << std::endl; +#endif + return false; + } + + result.resize(compressedDataSize); + output->swap(result); + + return true; +} + +char* Decompress(const char* input_data, size_t input_length, + int* decompress_size) { + + assert(input_data != nullptr); + assert(decompress_size != nullptr); + + if (input_length == 0) { + return nullptr; + } + + COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr; + + DECOMPRESSOR_HANDLE decompressor = NULL; + + BOOL success = CreateDecompressor( + COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm + allocRoutinesPtr, // Optional allocation routine + &decompressor); // Handle + + + if (!success) { +#ifdef _DEBUG + std::cerr << "XPRESS: Failed to create Decompressor LastError " + << GetLastError() << std::endl; +#endif + return nullptr; + } + + std::unique_ptr<void, decltype(CloseDecompressorFun)> + compressorGuard(decompressor, CloseDecompressorFun); + + SIZE_T decompressedBufferSize = 0; + + success = ::Decompress( + decompressor, // Compressor Handle + const_cast<char*>(input_data), // Compressed data + input_length, // Compressed data size + NULL, // Buffer set to NULL + 0, // Buffer size set to 0 + &decompressedBufferSize); // Decompressed Data size + + if (!success) { + + auto lastError = GetLastError(); + + if (lastError != ERROR_INSUFFICIENT_BUFFER) { +#ifdef _DEBUG + std::cerr + << "XPRESS: Failed to estimate decompressed buffer size LastError " + << lastError << std::endl; +#endif + return nullptr; + } + } + + assert(decompressedBufferSize > 0); + + // On Windows we are limited to a 32-bit int for the + // output data size argument + // so we hopefully never get here + if (decompressedBufferSize > std::numeric_limits<int>::max()) { + assert(false); + return nullptr; + } + + // The callers are deallocating using delete[] + // thus we must allocate with new[] + std::unique_ptr<char[]> outputBuffer(new char[decompressedBufferSize]); + + SIZE_T decompressedDataSize = 0; + + success = ::Decompress( + decompressor, + const_cast<char*>(input_data), + input_length, + outputBuffer.get(), + decompressedBufferSize, + &decompressedDataSize); + + if (!success) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to decompress LastError " << + GetLastError() << std::endl; +#endif + return nullptr; + } + + *decompress_size = static_cast<int>(decompressedDataSize); + + // Return the raw buffer to the caller supporting the tradition + return outputBuffer.release(); +} +} +} +} + +#endif diff --git a/src/rocksdb/port/win/xpress_win.h b/src/rocksdb/port/win/xpress_win.h new file mode 100644 index 00000000..5b11e7da --- /dev/null +++ b/src/rocksdb/port/win/xpress_win.h @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include <string> + +namespace rocksdb { +namespace port { +namespace xpress { + +bool Compress(const char* input, size_t length, std::string* output); + +char* Decompress(const char* input_data, size_t input_length, + int* decompress_size); + +} +} +} + diff --git a/src/rocksdb/port/xpress.h b/src/rocksdb/port/xpress.h new file mode 100644 index 00000000..457025f6 --- /dev/null +++ b/src/rocksdb/port/xpress.h @@ -0,0 +1,17 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +// Xpress on Windows is implemeted using Win API +#if defined(ROCKSDB_PLATFORM_POSIX) +#error "Xpress compression not implemented" +#elif defined(OS_WIN) +#include "port/win/xpress_win.h" +#endif |