diff options
Diffstat (limited to 'src/rocksdb/hdfs/env_hdfs.h')
-rw-r--r-- | src/rocksdb/hdfs/env_hdfs.h | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/src/rocksdb/hdfs/env_hdfs.h b/src/rocksdb/hdfs/env_hdfs.h new file mode 100644 index 000000000..6005c3664 --- /dev/null +++ b/src/rocksdb/hdfs/env_hdfs.h @@ -0,0 +1,384 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once +#include <algorithm> +#include <stdio.h> +#include <time.h> +#include <iostream> +#include "port/sys_time.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" + +#ifdef USE_HDFS +#include <hdfs.h> + +namespace ROCKSDB_NAMESPACE { + +// Thrown during execution when there is an issue with the supplied +// arguments. +class HdfsUsageException : public std::exception { }; + +// A simple exception that indicates something went wrong that is not +// recoverable. The intention is for the message to be printed (with +// nothing else) and the process terminate. +class HdfsFatalException : public std::exception { +public: + explicit HdfsFatalException(const std::string& s) : what_(s) { } + virtual ~HdfsFatalException() throw() { } + virtual const char* what() const throw() { + return what_.c_str(); + } +private: + const std::string what_; +}; + +// +// The HDFS environment for rocksdb. This class overrides all the +// file/dir access methods and delegates the thread-mgmt methods to the +// default posix environment. +// +class HdfsEnv : public Env { + + public: + explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) { + posixEnv = Env::Default(); + fileSys_ = connectToPath(fsname_); + } + + virtual ~HdfsEnv() { + fprintf(stderr, "Destroying HdfsEnv::Default()\n"); + hdfsDisconnect(fileSys_); + } + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) override; + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& options) override; + + Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) override; + + Status NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) override; + + Status FileExists(const std::string& fname) override; + + Status GetChildren(const std::string& path, + std::vector<std::string>* result) override; + + Status DeleteFile(const std::string& fname) override; + + Status CreateDir(const std::string& name) override; + + Status CreateDirIfMissing(const std::string& name) override; + + Status DeleteDir(const std::string& name) override; + + Status GetFileSize(const std::string& fname, uint64_t* size) override; + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override; + + Status RenameFile(const std::string& src, const std::string& target) override; + + Status LinkFile(const std::string& /*src*/, + const std::string& /*target*/) override { + return Status::NotSupported(); // not supported + } + + Status LockFile(const std::string& fname, FileLock** lock) override; + + Status UnlockFile(FileLock* lock) override; + + Status NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) override; + + void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, + void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override { + posixEnv->Schedule(function, arg, pri, tag, unschedFunction); + } + + int UnSchedule(void* tag, Priority pri) override { + return posixEnv->UnSchedule(tag, pri); + } + + void StartThread(void (*function)(void* arg), void* arg) override { + posixEnv->StartThread(function, arg); + } + + void WaitForJoin() override { posixEnv->WaitForJoin(); } + + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { + return posixEnv->GetThreadPoolQueueLen(pri); + } + + Status GetTestDirectory(std::string* path) override { + return posixEnv->GetTestDirectory(path); + } + + uint64_t NowMicros() override { return posixEnv->NowMicros(); } + + void SleepForMicroseconds(int micros) override { + posixEnv->SleepForMicroseconds(micros); + } + + Status GetHostName(char* name, uint64_t len) override { + return posixEnv->GetHostName(name, len); + } + + Status GetCurrentTime(int64_t* unix_time) override { + return posixEnv->GetCurrentTime(unix_time); + } + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override { + return posixEnv->GetAbsolutePath(db_path, output_path); + } + + void SetBackgroundThreads(int number, Priority pri = LOW) override { + posixEnv->SetBackgroundThreads(number, pri); + } + + int GetBackgroundThreads(Priority pri = LOW) override { + return posixEnv->GetBackgroundThreads(pri); + } + + void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { + posixEnv->IncBackgroundThreadsIfNeeded(number, pri); + } + + std::string TimeToString(uint64_t number) override { + return posixEnv->TimeToString(number); + } + + static uint64_t gettid() { + assert(sizeof(pthread_t) <= sizeof(uint64_t)); + return (uint64_t)pthread_self(); + } + + uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } + + private: + std::string fsname_; // string of the form "hdfs://hostname:port/" + hdfsFS fileSys_; // a single FileSystem object for all files + Env* posixEnv; // This object is derived from Env, but not from + // posixEnv. We have posixnv as an encapsulated + // object here so that we can use posix timers, + // posix threads, etc. + + static const std::string kProto; + static const std::string pathsep; + + /** + * If the URI is specified of the form hdfs://server:port/path, + * then connect to the specified cluster + * else connect to default. + */ + hdfsFS connectToPath(const std::string& uri) { + if (uri.empty()) { + return nullptr; + } + if (uri.find(kProto) != 0) { + // uri doesn't start with hdfs:// -> use default:0, which is special + // to libhdfs. + return hdfsConnectNewInstance("default", 0); + } + const std::string hostport = uri.substr(kProto.length()); + + std::vector <std::string> parts; + split(hostport, ':', parts); + if (parts.size() != 2) { + throw HdfsFatalException("Bad uri for hdfs " + uri); + } + // parts[0] = hosts, parts[1] = port/xxx/yyy + std::string host(parts[0]); + std::string remaining(parts[1]); + + int rem = static_cast<int>(remaining.find(pathsep)); + std::string portStr = (rem == 0 ? remaining : + remaining.substr(0, rem)); + + tPort port; + port = atoi(portStr.c_str()); + if (port == 0) { + throw HdfsFatalException("Bad host-port for hdfs " + uri); + } + hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); + return fs; + } + + void split(const std::string &s, char delim, + std::vector<std::string> &elems) { + elems.clear(); + size_t prev = 0; + size_t pos = s.find(delim); + while (pos != std::string::npos) { + elems.push_back(s.substr(prev, pos)); + prev = pos + 1; + pos = s.find(delim, prev); + } + elems.push_back(s.substr(prev, s.size())); + } +}; + +} // namespace ROCKSDB_NAMESPACE + +#else // USE_HDFS + +namespace ROCKSDB_NAMESPACE { + +static const Status notsup; + +class HdfsEnv : public Env { + + public: + explicit HdfsEnv(const std::string& /*fsname*/) { + fprintf(stderr, "You have not build rocksdb with HDFS support\n"); + fprintf(stderr, "Please see hdfs/README for details\n"); + abort(); + } + + virtual ~HdfsEnv() { + } + + virtual Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& options) override; + + virtual Status NewRandomAccessFile( + const std::string& /*fname*/, + std::unique_ptr<RandomAccessFile>* /*result*/, + const EnvOptions& /*options*/) override { + return notsup; + } + + virtual Status NewWritableFile(const std::string& /*fname*/, + std::unique_ptr<WritableFile>* /*result*/, + const EnvOptions& /*options*/) override { + return notsup; + } + + virtual Status NewDirectory(const std::string& /*name*/, + std::unique_ptr<Directory>* /*result*/) override { + return notsup; + } + + virtual Status FileExists(const std::string& /*fname*/) override { + return notsup; + } + + virtual Status GetChildren(const std::string& /*path*/, + std::vector<std::string>* /*result*/) override { + return notsup; + } + + virtual Status DeleteFile(const std::string& /*fname*/) override { + return notsup; + } + + virtual Status CreateDir(const std::string& /*name*/) override { + return notsup; + } + + virtual Status CreateDirIfMissing(const std::string& /*name*/) override { + return notsup; + } + + virtual Status DeleteDir(const std::string& /*name*/) override { + return notsup; + } + + virtual Status GetFileSize(const std::string& /*fname*/, + uint64_t* /*size*/) override { + return notsup; + } + + virtual Status GetFileModificationTime(const std::string& /*fname*/, + uint64_t* /*time*/) override { + return notsup; + } + + virtual Status RenameFile(const std::string& /*src*/, + const std::string& /*target*/) override { + return notsup; + } + + virtual Status LinkFile(const std::string& /*src*/, + const std::string& /*target*/) override { + return notsup; + } + + virtual Status LockFile(const std::string& /*fname*/, + FileLock** /*lock*/) override { + return notsup; + } + + virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; } + + virtual Status NewLogger(const std::string& /*fname*/, + std::shared_ptr<Logger>* /*result*/) override { + return notsup; + } + + virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/, + Priority /*pri*/ = LOW, void* /*tag*/ = nullptr, + void (* /*unschedFunction*/)(void* arg) = 0) override {} + + virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; } + + virtual void StartThread(void (* /*function*/)(void* arg), + void* /*arg*/) override {} + + virtual void WaitForJoin() override {} + + virtual unsigned int GetThreadPoolQueueLen( + Priority /*pri*/ = LOW) const override { + return 0; + } + + virtual Status GetTestDirectory(std::string* /*path*/) override { + return notsup; + } + + virtual uint64_t NowMicros() override { return 0; } + + virtual void SleepForMicroseconds(int /*micros*/) override {} + + virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override { + return notsup; + } + + virtual Status GetCurrentTime(int64_t* /*unix_time*/) override { + return notsup; + } + + virtual Status GetAbsolutePath(const std::string& /*db_path*/, + std::string* /*outputpath*/) override { + return notsup; + } + + virtual void SetBackgroundThreads(int /*number*/, + Priority /*pri*/ = LOW) override {} + virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override { + return 0; + } + virtual void IncBackgroundThreadsIfNeeded(int /*number*/, + Priority /*pri*/) override {} + virtual std::string TimeToString(uint64_t /*number*/) override { return ""; } + + virtual uint64_t GetThreadID() const override { + return 0; + } +}; +} // namespace ROCKSDB_NAMESPACE + +#endif // USE_HDFS |