summaryrefslogtreecommitdiffstats
path: root/src/kv/KineticStore.h
blob: b22d4f0218f55b2440a2c20e70084962169a0d06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef KINETIC_STORE_H
#define KINETIC_STORE_H

#include "include/types.h"
#include "include/buffer_fwd.h"
#include "KeyValueDB.h"
#include <set>
#include <map>
#include <string>
#include <kinetic/kinetic.h>

#include <errno.h>
#include "common/errno.h"
#include "common/dout.h"
#include "include/ceph_assert.h"
#include "common/Formatter.h"

#include "common/ceph_context.h"

class PerfCounters;

enum {
  l_kinetic_first = 34400,
  l_kinetic_gets,
  l_kinetic_txns,
  l_kinetic_last,
};

/**
 * Uses Kinetic to implement the KeyValueDB interface
 */
class KineticStore : public KeyValueDB {
  CephContext *cct;
  PerfCounters *logger;
  string host;
  int port;
  int user_id;
  string hmac_key;
  bool use_ssl;
  std::unique_ptr<kinetic::BlockingKineticConnection> kinetic_conn;

  int do_open(ostream &out, bool create_if_missing);

public:
  explicit KineticStore(CephContext *c);
  ~KineticStore();

  static int _test_init(CephContext *c);
  int init();

  /// Opens underlying db
  int open(ostream &out, const std::vector<ColumnFamily>& = {}) override;
  /// Creates underlying db if missing and opens it
  int create_and_open(ostream &out, const std::vector<ColumnFamily>& = {}) override;

  void close() override;

  enum KineticOpType {
    KINETIC_OP_WRITE,
    KINETIC_OP_DELETE,
  };

  struct KineticOp {
    KineticOpType type;
    std::string key;
    bufferlist data;
    KineticOp(KineticOpType type, const string &key) : type(type), key(key) {}
    KineticOp(KineticOpType type, const string &key, const bufferlist &data)
      : type(type), key(key), data(data) {}
  };

  class KineticTransactionImpl : public KeyValueDB::TransactionImpl {
  public:
    vector<KineticOp> ops;
    KineticStore *db;

    explicit KineticTransactionImpl(KineticStore *db) : db(db) {}
    void set(
      const string &prefix,
      const string &k,
      const bufferlist &bl);
    void rmkey(
      const string &prefix,
      const string &k);
    void rmkeys_by_prefix(
      const string &prefix
      );
    void rm_range_keys(
        const string &prefix,
        const string &start,
        const string &end) override;
  };

  KeyValueDB::Transaction get_transaction() override {
    return std::make_shared<KineticTransactionImpl>(this);
  }

  int submit_transaction(KeyValueDB::Transaction t) override;
  int submit_transaction_sync(KeyValueDB::Transaction t) override;
  int get(
    const string &prefix,
    const std::set<string> &key,
    std::map<string, bufferlist> *out
    );
  using KeyValueDB::get;

  class KineticWholeSpaceIteratorImpl :
    public KeyValueDB::WholeSpaceIteratorImpl {
    std::set<std::string> keys;
    std::set<std::string>::iterator keys_iter;
    kinetic::BlockingKineticConnection *kinetic_conn;
    kinetic::KineticStatus kinetic_status;
  public:
    explicit KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn);
    virtual ~KineticWholeSpaceIteratorImpl() { }

    int seek_to_first() override {
      return seek_to_first("");
    }
    int seek_to_first(const string &prefix);
    int seek_to_last() override;
    int seek_to_last(const string &prefix);
    int upper_bound(const string &prefix, const string &after);
    int lower_bound(const string &prefix, const string &to);
    bool valid() override;
    int next() override;
    int prev() override;
    string key();
    pair<string,string> raw_key();
    bool raw_key_is_prefixed(const string &prefix);
    bufferlist value() override;
    int status() override;
  };

  /// Utility
  static string combine_strings(const string &prefix, const string &value);
  static int split_key(string &in_prefix, string *prefix, string *key);
  static bufferlist to_bufferlist(const kinetic::KineticRecord &record);
  virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
    // not used by the osd
    return 0;
  }


  WholeSpaceIterator get_wholespace_iterator() {
    return std::make_shared<KineticWholeSpaceIteratorImpl>(kinetic_conn.get());
  }
};

#endif