summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/transaction_base.h
blob: f279676c609b3557adddab0b1dfd177c2112f9f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once

#ifndef ROCKSDB_LITE

#include <stack>
#include <string>
#include <vector>

#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/autovector.h"
#include "utilities/transactions/transaction_util.h"

namespace ROCKSDB_NAMESPACE {

class TransactionBaseImpl : public Transaction {
 public:
  TransactionBaseImpl(DB* db, const WriteOptions& write_options);

  virtual ~TransactionBaseImpl();

  // Remove pending operations queued in this transaction.
  virtual void Clear();

  void Reinitialize(DB* db, const WriteOptions& write_options);

  // Called before executing Put, Merge, Delete, and GetForUpdate.  If TryLock
  // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
  // do_validate will be false if called from PutUntracked, DeleteUntracked,
  // MergeUntracked, or GetForUpdate(do_validate=false)
  virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
                         bool read_only, bool exclusive,
                         const bool do_validate = true,
                         const bool assume_tracked = false) = 0;

  void SetSavePoint() override;

  Status RollbackToSavePoint() override;

  Status PopSavePoint() override;

  using Transaction::Get;
  Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
             const Slice& key, std::string* value) override;

  Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
             const Slice& key, PinnableSlice* value) override;

  Status Get(const ReadOptions& options, const Slice& key,
             std::string* value) override {
    return Get(options, db_->DefaultColumnFamily(), key, value);
  }

  using Transaction::GetForUpdate;
  Status GetForUpdate(const ReadOptions& options,
                      ColumnFamilyHandle* column_family, const Slice& key,
                      std::string* value, bool exclusive,
                      const bool do_validate) override;

  Status GetForUpdate(const ReadOptions& options,
                      ColumnFamilyHandle* column_family, const Slice& key,
                      PinnableSlice* pinnable_val, bool exclusive,
                      const bool do_validate) override;

  Status GetForUpdate(const ReadOptions& options, const Slice& key,
                      std::string* value, bool exclusive,
                      const bool do_validate) override {
    return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
                        exclusive, do_validate);
  }

  using Transaction::MultiGet;
  std::vector<Status> MultiGet(
      const ReadOptions& options,
      const std::vector<ColumnFamilyHandle*>& column_family,
      const std::vector<Slice>& keys,
      std::vector<std::string>* values) override;

  std::vector<Status> MultiGet(const ReadOptions& options,
                               const std::vector<Slice>& keys,
                               std::vector<std::string>* values) override {
    return MultiGet(options, std::vector<ColumnFamilyHandle*>(
                                 keys.size(), db_->DefaultColumnFamily()),
                    keys, values);
  }

  void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
                const size_t num_keys, const Slice* keys, PinnableSlice* values,
                Status* statuses, const bool sorted_input = false) override;

  using Transaction::MultiGetForUpdate;
  std::vector<Status> MultiGetForUpdate(
      const ReadOptions& options,
      const std::vector<ColumnFamilyHandle*>& column_family,
      const std::vector<Slice>& keys,
      std::vector<std::string>* values) override;

  std::vector<Status> MultiGetForUpdate(
      const ReadOptions& options, const std::vector<Slice>& keys,
      std::vector<std::string>* values) override {
    return MultiGetForUpdate(options,
                             std::vector<ColumnFamilyHandle*>(
                                 keys.size(), db_->DefaultColumnFamily()),
                             keys, values);
  }

  Iterator* GetIterator(const ReadOptions& read_options) override;
  Iterator* GetIterator(const ReadOptions& read_options,
                        ColumnFamilyHandle* column_family) override;

  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
             const Slice& value, const bool assume_tracked = false) override;
  Status Put(const Slice& key, const Slice& value) override {
    return Put(nullptr, key, value);
  }

  Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
             const SliceParts& value,
             const bool assume_tracked = false) override;
  Status Put(const SliceParts& key, const SliceParts& value) override {
    return Put(nullptr, key, value);
  }

  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
               const Slice& value, const bool assume_tracked = false) override;
  Status Merge(const Slice& key, const Slice& value) override {
    return Merge(nullptr, key, value);
  }

  Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
                const bool assume_tracked = false) override;
  Status Delete(const Slice& key) override { return Delete(nullptr, key); }
  Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
                const bool assume_tracked = false) override;
  Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }

  Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
                      const bool assume_tracked = false) override;
  Status SingleDelete(const Slice& key) override {
    return SingleDelete(nullptr, key);
  }
  Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
                      const bool assume_tracked = false) override;
  Status SingleDelete(const SliceParts& key) override {
    return SingleDelete(nullptr, key);
  }

  Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
                      const Slice& value) override;
  Status PutUntracked(const Slice& key, const Slice& value) override {
    return PutUntracked(nullptr, key, value);
  }

  Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
                      const SliceParts& value) override;
  Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
    return PutUntracked(nullptr, key, value);
  }

  Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
                        const Slice& value) override;
  Status MergeUntracked(const Slice& key, const Slice& value) override {
    return MergeUntracked(nullptr, key, value);
  }

  Status DeleteUntracked(ColumnFamilyHandle* column_family,
                         const Slice& key) override;
  Status DeleteUntracked(const Slice& key) override {
    return DeleteUntracked(nullptr, key);
  }
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
                         const SliceParts& key) override;
  Status DeleteUntracked(const SliceParts& key) override {
    return DeleteUntracked(nullptr, key);
  }

  Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
                               const Slice& key) override;
  Status SingleDeleteUntracked(const Slice& key) override {
    return SingleDeleteUntracked(nullptr, key);
  }

  void PutLogData(const Slice& blob) override;

  WriteBatchWithIndex* GetWriteBatch() override;

  virtual void SetLockTimeout(int64_t /*timeout*/) override { /* Do nothing */
  }

  const Snapshot* GetSnapshot() const override {
    return snapshot_ ? snapshot_.get() : nullptr;
  }

  virtual void SetSnapshot() override;
  void SetSnapshotOnNextOperation(
      std::shared_ptr<TransactionNotifier> notifier = nullptr) override;

  void ClearSnapshot() override {
    snapshot_.reset();
    snapshot_needed_ = false;
    snapshot_notifier_ = nullptr;
  }

  void DisableIndexing() override { indexing_enabled_ = false; }

  void EnableIndexing() override { indexing_enabled_ = true; }

  uint64_t GetElapsedTime() const override;

  uint64_t GetNumPuts() const override;

  uint64_t GetNumDeletes() const override;

  uint64_t GetNumMerges() const override;

  uint64_t GetNumKeys() const override;

  void UndoGetForUpdate(ColumnFamilyHandle* column_family,
                        const Slice& key) override;
  void UndoGetForUpdate(const Slice& key) override {
    return UndoGetForUpdate(nullptr, key);
  };

  // Get list of keys in this transaction that must not have any conflicts
  // with writes in other transactions.
  const TransactionKeyMap& GetTrackedKeys() const { return tracked_keys_; }

  WriteOptions* GetWriteOptions() override { return &write_options_; }

  void SetWriteOptions(const WriteOptions& write_options) override {
    write_options_ = write_options;
  }

  // Used for memory management for snapshot_
  void ReleaseSnapshot(const Snapshot* snapshot, DB* db);

  // iterates over the given batch and makes the appropriate inserts.
  // used for rebuilding prepared transactions after recovery.
  virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override;

  WriteBatch* GetCommitTimeWriteBatch() override;

 protected:
  // Add a key to the list of tracked keys.
  //
  // seqno is the earliest seqno this key was involved with this transaction.
  // readonly should be set to true if no data was written for this key
  void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
                bool readonly, bool exclusive);

  // Helper function to add a key to the given TransactionKeyMap
  static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
                       const std::string& key, SequenceNumber seqno,
                       bool readonly, bool exclusive);

  // Called when UndoGetForUpdate determines that this key can be unlocked.
  virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
                                  const Slice& key) = 0;

  std::unique_ptr<TransactionKeyMap> GetTrackedKeysSinceSavePoint();

  // Sets a snapshot if SetSnapshotOnNextOperation() has been called.
  void SetSnapshotIfNeeded();

  // Initialize write_batch_ for 2PC by inserting Noop.
  inline void InitWriteBatch(bool clear = false) {
    if (clear) {
      write_batch_.Clear();
    }
    assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
    WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
  }

  DB* db_;
  DBImpl* dbimpl_;

  WriteOptions write_options_;

  const Comparator* cmp_;

  // Stores that time the txn was constructed, in microseconds.
  uint64_t start_time_;

  // Stores the current snapshot that was set by SetSnapshot or null if
  // no snapshot is currently set.
  std::shared_ptr<const Snapshot> snapshot_;

  // Count of various operations pending in this transaction
  uint64_t num_puts_ = 0;
  uint64_t num_deletes_ = 0;
  uint64_t num_merges_ = 0;

  struct SavePoint {
    std::shared_ptr<const Snapshot> snapshot_;
    bool snapshot_needed_ = false;
    std::shared_ptr<TransactionNotifier> snapshot_notifier_;
    uint64_t num_puts_ = 0;
    uint64_t num_deletes_ = 0;
    uint64_t num_merges_ = 0;

    // Record all keys tracked since the last savepoint
    TransactionKeyMap new_keys_;

    SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
              std::shared_ptr<TransactionNotifier> snapshot_notifier,
              uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
        : snapshot_(snapshot),
          snapshot_needed_(snapshot_needed),
          snapshot_notifier_(snapshot_notifier),
          num_puts_(num_puts),
          num_deletes_(num_deletes),
          num_merges_(num_merges) {}

    SavePoint() = default;
  };

  // Records writes pending in this transaction
  WriteBatchWithIndex write_batch_;

  // Map from column_family_id to map of keys that are involved in this
  // transaction.
  // For Pessimistic Transactions this is the list of locked keys.
  // Optimistic Transactions will wait till commit time to do conflict checking.
  TransactionKeyMap tracked_keys_;

  // Stack of the Snapshot saved at each save point. Saved snapshots may be
  // nullptr if there was no snapshot at the time SetSavePoint() was called.
  std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
                             autovector<TransactionBaseImpl::SavePoint>>>
      save_points_;

 private:
  friend class WritePreparedTxn;
  // Extra data to be persisted with the commit. Note this is only used when
  // prepare phase is not skipped.
  WriteBatch commit_time_batch_;

  // If true, future Put/Merge/Deletes will be indexed in the
  // WriteBatchWithIndex.
  // If false, future Put/Merge/Deletes will be inserted directly into the
  // underlying WriteBatch and not indexed in the WriteBatchWithIndex.
  bool indexing_enabled_;

  // SetSnapshotOnNextOperation() has been called and the snapshot has not yet
  // been reset.
  bool snapshot_needed_ = false;

  // SetSnapshotOnNextOperation() has been called and the caller would like
  // a notification through the TransactionNotifier interface
  std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;

  Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
                 bool read_only, bool exclusive, const bool do_validate = true,
                 const bool assume_tracked = false);

  WriteBatchBase* GetBatchForWrite();
  void SetSnapshotInternal(const Snapshot* snapshot);
};

}  // namespace ROCKSDB_NAMESPACE

#endif  // ROCKSDB_LITE