summaryrefslogtreecommitdiffstats
path: root/src/kv/KineticStore.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/kv/KineticStore.h152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/kv/KineticStore.h b/src/kv/KineticStore.h
new file mode 100644
index 00000000..b22d4f02
--- /dev/null
+++ b/src/kv/KineticStore.h
@@ -0,0 +1,152 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef KINETIC_STORE_H
+#define KINETIC_STORE_H
+
+#include "include/types.h"
+#include "include/buffer_fwd.h"
+#include "KeyValueDB.h"
+#include <set>
+#include <map>
+#include <string>
+#include <kinetic/kinetic.h>
+
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+#include "common/Formatter.h"
+
+#include "common/ceph_context.h"
+
+class PerfCounters;
+
+enum {
+ l_kinetic_first = 34400,
+ l_kinetic_gets,
+ l_kinetic_txns,
+ l_kinetic_last,
+};
+
+/**
+ * Uses Kinetic to implement the KeyValueDB interface
+ */
+class KineticStore : public KeyValueDB {
+ CephContext *cct;
+ PerfCounters *logger;
+ string host;
+ int port;
+ int user_id;
+ string hmac_key;
+ bool use_ssl;
+ std::unique_ptr<kinetic::BlockingKineticConnection> kinetic_conn;
+
+ int do_open(ostream &out, bool create_if_missing);
+
+public:
+ explicit KineticStore(CephContext *c);
+ ~KineticStore();
+
+ static int _test_init(CephContext *c);
+ int init();
+
+ /// Opens underlying db
+ int open(ostream &out, const std::vector<ColumnFamily>& = {}) override;
+ /// Creates underlying db if missing and opens it
+ int create_and_open(ostream &out, const std::vector<ColumnFamily>& = {}) override;
+
+ void close() override;
+
+ enum KineticOpType {
+ KINETIC_OP_WRITE,
+ KINETIC_OP_DELETE,
+ };
+
+ struct KineticOp {
+ KineticOpType type;
+ std::string key;
+ bufferlist data;
+ KineticOp(KineticOpType type, const string &key) : type(type), key(key) {}
+ KineticOp(KineticOpType type, const string &key, const bufferlist &data)
+ : type(type), key(key), data(data) {}
+ };
+
+ class KineticTransactionImpl : public KeyValueDB::TransactionImpl {
+ public:
+ vector<KineticOp> ops;
+ KineticStore *db;
+
+ explicit KineticTransactionImpl(KineticStore *db) : db(db) {}
+ void set(
+ const string &prefix,
+ const string &k,
+ const bufferlist &bl);
+ void rmkey(
+ const string &prefix,
+ const string &k);
+ void rmkeys_by_prefix(
+ const string &prefix
+ );
+ void rm_range_keys(
+ const string &prefix,
+ const string &start,
+ const string &end) override;
+ };
+
+ KeyValueDB::Transaction get_transaction() override {
+ return std::make_shared<KineticTransactionImpl>(this);
+ }
+
+ int submit_transaction(KeyValueDB::Transaction t) override;
+ int submit_transaction_sync(KeyValueDB::Transaction t) override;
+ int get(
+ const string &prefix,
+ const std::set<string> &key,
+ std::map<string, bufferlist> *out
+ );
+ using KeyValueDB::get;
+
+ class KineticWholeSpaceIteratorImpl :
+ public KeyValueDB::WholeSpaceIteratorImpl {
+ std::set<std::string> keys;
+ std::set<std::string>::iterator keys_iter;
+ kinetic::BlockingKineticConnection *kinetic_conn;
+ kinetic::KineticStatus kinetic_status;
+ public:
+ explicit KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn);
+ virtual ~KineticWholeSpaceIteratorImpl() { }
+
+ int seek_to_first() override {
+ return seek_to_first("");
+ }
+ int seek_to_first(const string &prefix);
+ int seek_to_last() override;
+ int seek_to_last(const string &prefix);
+ int upper_bound(const string &prefix, const string &after);
+ int lower_bound(const string &prefix, const string &to);
+ bool valid() override;
+ int next() override;
+ int prev() override;
+ string key();
+ pair<string,string> raw_key();
+ bool raw_key_is_prefixed(const string &prefix);
+ bufferlist value() override;
+ int status() override;
+ };
+
+ /// Utility
+ static string combine_strings(const string &prefix, const string &value);
+ static int split_key(string &in_prefix, string *prefix, string *key);
+ static bufferlist to_bufferlist(const kinetic::KineticRecord &record);
+ virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
+ // not used by the osd
+ return 0;
+ }
+
+
+ WholeSpaceIterator get_wholespace_iterator() {
+ return std::make_shared<KineticWholeSpaceIteratorImpl>(kinetic_conn.get());
+ }
+};
+
+#endif