diff options
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.h')
-rw-r--r-- | storage/rocksdb/rdb_sst_info.h | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/storage/rocksdb/rdb_sst_info.h b/storage/rocksdb/rdb_sst_info.h new file mode 100644 index 00000000..66da3b7c --- /dev/null +++ b/storage/rocksdb/rdb_sst_info.h @@ -0,0 +1,265 @@ +/* + Copyright (c) 2016, Facebook, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +#pragma once + +/* C++ standard header files */ +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <stack> +#include <string> +#include <thread> +#include <utility> +#include <vector> + +/* RocksDB header files */ +#include "rocksdb/db.h" +#include "rocksdb/sst_file_writer.h" + +/* MyRocks header files */ +#include "./rdb_utils.h" + +namespace myrocks { + +class Rdb_sst_file_ordered { + private: + class Rdb_sst_file { + private: + Rdb_sst_file(const Rdb_sst_file &p) = delete; + Rdb_sst_file &operator=(const Rdb_sst_file &p) = delete; + + rocksdb::DB *const m_db; + rocksdb::ColumnFamilyHandle *const m_cf; + const rocksdb::DBOptions &m_db_options; + rocksdb::SstFileWriter *m_sst_file_writer; + const std::string m_name; + const bool m_tracing; + const rocksdb::Comparator *m_comparator; + + std::string generateKey(const std::string &key); + + public: + Rdb_sst_file(rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf, + const rocksdb::DBOptions &db_options, const std::string &name, + const bool tracing); + ~Rdb_sst_file(); + + rocksdb::Status open(); + rocksdb::Status put(const rocksdb::Slice &key, const rocksdb::Slice &value); + rocksdb::Status commit(); + + inline const std::string get_name() const { return m_name; } + inline int compare(rocksdb::Slice key1, rocksdb::Slice key2) { + return m_comparator->Compare(key1, key2); + } + }; + + class Rdb_sst_stack { + private: + char *m_buffer; + size_t m_buffer_size; + size_t m_offset; + std::stack<std::tuple<size_t, size_t, size_t>> m_stack; + + public: + explicit Rdb_sst_stack(size_t max_size) + : m_buffer(nullptr), m_buffer_size(max_size) {} + ~Rdb_sst_stack() { delete[] m_buffer; } + + void reset() { m_offset = 0; } + bool empty() { return m_stack.empty(); } + void push(const rocksdb::Slice &key, const rocksdb::Slice &value); + std::pair<rocksdb::Slice, rocksdb::Slice> top(); + void pop() { m_stack.pop(); } + size_t size() { return m_stack.size(); } + }; + + bool m_use_stack; + bool m_first; + std::string m_first_key; + std::string m_first_value; + Rdb_sst_stack m_stack; + Rdb_sst_file m_file; + + rocksdb::Status apply_first(); + + public: + Rdb_sst_file_ordered(rocksdb::DB *const db, + rocksdb::ColumnFamilyHandle *const cf, + const rocksdb::DBOptions &db_options, + const std::string &name, const bool tracing, + size_t max_size); + + inline rocksdb::Status open() { return m_file.open(); } + rocksdb::Status put(const rocksdb::Slice &key, const rocksdb::Slice &value); + rocksdb::Status commit(); + inline const std::string get_name() const { return m_file.get_name(); } +}; + +class Rdb_sst_info { + private: + Rdb_sst_info(const Rdb_sst_info &p) = delete; + Rdb_sst_info &operator=(const Rdb_sst_info &p) = delete; + + rocksdb::DB *const m_db; + rocksdb::ColumnFamilyHandle *const m_cf; + const rocksdb::DBOptions &m_db_options; + uint64_t m_curr_size; + uint64_t m_max_size; + uint32_t m_sst_count; + std::atomic<int> m_background_error; + bool m_done; + std::string m_prefix; + static std::atomic<uint64_t> m_prefix_counter; + static std::string m_suffix; + mysql_mutex_t m_commit_mutex; + Rdb_sst_file_ordered *m_sst_file; + + // List of committed SST files - we'll ingest them later in one single batch + std::vector<std::string> m_committed_files; + + const bool m_tracing; + bool m_print_client_error; + + int open_new_sst_file(); + void close_curr_sst_file(); + void commit_sst_file(Rdb_sst_file_ordered *sst_file); + + void set_error_msg(const std::string &sst_file_name, + const rocksdb::Status &s); + + public: + Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename, + const std::string &indexname, + rocksdb::ColumnFamilyHandle *const cf, + const rocksdb::DBOptions &db_options, const bool tracing); + ~Rdb_sst_info(); + + /* + This is the unit of work returned from Rdb_sst_info::finish and represents + a group of SST to be ingested atomically with other Rdb_sst_commit_info. + This is always local to the bulk loading complete operation so no locking + is required + */ + class Rdb_sst_commit_info { + public: + Rdb_sst_commit_info() : m_committed(true), m_cf(nullptr) {} + + Rdb_sst_commit_info(Rdb_sst_commit_info &&rhs) noexcept + : m_committed(rhs.m_committed), + m_cf(rhs.m_cf), + m_committed_files(std::move(rhs.m_committed_files)) { + rhs.m_committed = true; + rhs.m_cf = nullptr; + } + + Rdb_sst_commit_info &operator=(Rdb_sst_commit_info &&rhs) noexcept { + reset(); + + m_cf = rhs.m_cf; + m_committed_files = std::move(rhs.m_committed_files); + m_committed = rhs.m_committed; + + rhs.m_committed = true; + rhs.m_cf = nullptr; + + return *this; + } + + Rdb_sst_commit_info(const Rdb_sst_commit_info &) = delete; + Rdb_sst_commit_info &operator=(const Rdb_sst_commit_info &) = delete; + + ~Rdb_sst_commit_info() { reset(); } + + void reset() { + if (!m_committed) { + for (auto sst_file : m_committed_files) { + // In case something went wrong attempt to delete the temporary file. + // If everything went fine that file will have been renamed and this + // function call will fail. + std::remove(sst_file.c_str()); + } + } + m_committed_files.clear(); + m_cf = nullptr; + m_committed = true; + } + + bool has_work() const { + return m_cf != nullptr && m_committed_files.size() > 0; + } + + void init(rocksdb::ColumnFamilyHandle *cf, + std::vector<std::string> &&files) { + DBUG_ASSERT(m_cf == nullptr && m_committed_files.size() == 0 && + m_committed); + m_cf = cf; + m_committed_files = std::move(files); + m_committed = false; + } + + rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; } + + const std::vector<std::string> &get_committed_files() const { + return m_committed_files; + } + + void commit() { m_committed = true; } + + private: + bool m_committed; + rocksdb::ColumnFamilyHandle *m_cf; + std::vector<std::string> m_committed_files; + }; + + int put(const rocksdb::Slice &key, const rocksdb::Slice &value); + int finish(Rdb_sst_commit_info *commit_info, bool print_client_error = true); + + bool is_done() const { return m_done; } + + bool have_background_error() { return m_background_error != 0; } + + int get_and_reset_background_error() { + int ret = m_background_error; + while (!m_background_error.compare_exchange_weak(ret, HA_EXIT_SUCCESS)) { + // Do nothing + } + + return ret; + } + + void set_background_error(int code) { + int expected = HA_EXIT_SUCCESS; + // Only assign 'code' into the error if it is already 0, otherwise ignore it + m_background_error.compare_exchange_strong(expected, code); + } + + /** Return the list of committed files later to be ingested **/ + const std::vector<std::string> &get_committed_files() { + return m_committed_files; + } + + rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; } + + static void init(const rocksdb::DB *const db); + + static void report_error_msg(const rocksdb::Status &s, + const char *sst_file_name); +}; + +} // namespace myrocks |