summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
blob: ca365d044e2d8d7334eaa3c945d6a12da1be6759 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE

#include "utilities/transactions/write_unprepared_txn_db.h"
#include "db/arena_wrapped_db_iter.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"

namespace ROCKSDB_NAMESPACE {

// Instead of reconstructing a Transaction object, and calling rollback on it,
// we can be more efficient with RollbackRecoveredTransaction by skipping
// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
    const DBImpl::RecoveredTransaction* rtxn) {
  // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
  assert(rtxn->unprepared_);
  auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
  auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
  // In theory we could write with disableWAL = true during recovery, and
  // assume that if we crash again during recovery, we can just replay from
  // the very beginning. Unfortunately, the XIDs from the application may not
  // necessarily be unique across restarts, potentially leading to situations
  // like this:
  //
  // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
  // -- crash and recover with Put(a) rolled back as it was not prepared
  // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
  // COMMIT(xid = 1)
  // -- crash and recover with both a, b
  //
  // We could just write the rollback marker, but then we would have to extend
  // MemTableInserter during recovery to actually do writes into the DB
  // instead of just dropping the in-memory write batch.
  //
  WriteOptions w_options;

  class InvalidSnapshotReadCallback : public ReadCallback {
   public:
    InvalidSnapshotReadCallback(SequenceNumber snapshot)
        : ReadCallback(snapshot) {}

    inline bool IsVisibleFullCheck(SequenceNumber) override {
      // The seq provided as snapshot is the seq right before we have locked and
      // wrote to it, so whatever is there, it is committed.
      return true;
    }

    // Ignore the refresh request since we are confident that our snapshot seq
    // is not going to be affected by concurrent compactions (not enabled yet.)
    void Refresh(SequenceNumber) override {}
  };

  // Iterate starting with largest sequence number.
  for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
    auto last_visible_txn = it->first - 1;
    const auto& batch = it->second.batch_;
    WriteBatch rollback_batch;

    struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
      DBImpl* db_;
      ReadOptions roptions;
      InvalidSnapshotReadCallback callback;
      WriteBatch* rollback_batch_;
      std::map<uint32_t, const Comparator*>& comparators_;
      std::map<uint32_t, ColumnFamilyHandle*>& handles_;
      using CFKeys = std::set<Slice, SetComparator>;
      std::map<uint32_t, CFKeys> keys_;
      bool rollback_merge_operands_;
      RollbackWriteBatchBuilder(
          DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
          std::map<uint32_t, const Comparator*>& comparators,
          std::map<uint32_t, ColumnFamilyHandle*>& handles,
          bool rollback_merge_operands)
          : db_(db),
            callback(snap_seq),
            // disable min_uncommitted optimization
            rollback_batch_(dst_batch),
            comparators_(comparators),
            handles_(handles),
            rollback_merge_operands_(rollback_merge_operands) {}

      Status Rollback(uint32_t cf, const Slice& key) {
        Status s;
        CFKeys& cf_keys = keys_[cf];
        if (cf_keys.size() == 0) {  // just inserted
          auto cmp = comparators_[cf];
          keys_[cf] = CFKeys(SetComparator(cmp));
        }
        auto res = cf_keys.insert(key);
        if (res.second ==
            false) {  // second is false if a element already existed.
          return s;
        }

        PinnableSlice pinnable_val;
        bool not_used;
        auto cf_handle = handles_[cf];
        DBImpl::GetImplOptions get_impl_options;
        get_impl_options.column_family = cf_handle;
        get_impl_options.value = &pinnable_val;
        get_impl_options.value_found = &not_used;
        get_impl_options.callback = &callback;
        s = db_->GetImpl(roptions, key, get_impl_options);
        assert(s.ok() || s.IsNotFound());
        if (s.ok()) {
          s = rollback_batch_->Put(cf_handle, key, pinnable_val);
          assert(s.ok());
        } else if (s.IsNotFound()) {
          // There has been no readable value before txn. By adding a delete we
          // make sure that there will be none afterwards either.
          s = rollback_batch_->Delete(cf_handle, key);
          assert(s.ok());
        } else {
          // Unexpected status. Return it to the user.
        }
        return s;
      }

      Status PutCF(uint32_t cf, const Slice& key,
                   const Slice& /*val*/) override {
        return Rollback(cf, key);
      }

      Status DeleteCF(uint32_t cf, const Slice& key) override {
        return Rollback(cf, key);
      }

      Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
        return Rollback(cf, key);
      }

      Status MergeCF(uint32_t cf, const Slice& key,
                     const Slice& /*val*/) override {
        if (rollback_merge_operands_) {
          return Rollback(cf, key);
        } else {
          return Status::OK();
        }
      }

      // Recovered batches do not contain 2PC markers.
      Status MarkNoop(bool) override { return Status::InvalidArgument(); }
      Status MarkBeginPrepare(bool) override {
        return Status::InvalidArgument();
      }
      Status MarkEndPrepare(const Slice&) override {
        return Status::InvalidArgument();
      }
      Status MarkCommit(const Slice&) override {
        return Status::InvalidArgument();
      }
      Status MarkRollback(const Slice&) override {
        return Status::InvalidArgument();
      }
    } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
                       *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
                       txn_db_options_.rollback_merge_operands);

    auto s = batch->Iterate(&rollback_handler);
    if (!s.ok()) {
      return s;
    }

    // The Rollback marker will be used as a batch separator
    WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);

    const uint64_t kNoLogRef = 0;
    const bool kDisableMemtable = true;
    const size_t kOneBatch = 1;
    uint64_t seq_used = kMaxSequenceNumber;
    s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
                            kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
    if (!s.ok()) {
      return s;
    }

    // If two_write_queues, we must manually release the sequence number to
    // readers.
    if (db_impl_->immutable_db_options().two_write_queues) {
      db_impl_->SetLastPublishedSequence(seq_used);
    }
  }

  return Status::OK();
}

Status WriteUnpreparedTxnDB::Initialize(
    const std::vector<size_t>& compaction_enabled_cf_indices,
    const std::vector<ColumnFamilyHandle*>& handles) {
  // TODO(lth): Reduce code duplication in this function.
  auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
  assert(dbimpl != nullptr);

  db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
  // A callback to commit a single sub-batch
  class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
   public:
    explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
        : db_(db) {}
    Status Callback(SequenceNumber commit_seq,
                    bool is_mem_disabled __attribute__((__unused__)), uint64_t,
                    size_t /*index*/, size_t /*total*/) override {
      assert(!is_mem_disabled);
      db_->AddCommitted(commit_seq, commit_seq);
      return Status::OK();
    }

   private:
    WritePreparedTxnDB* db_;
  };
  db_impl_->SetRecoverableStatePreReleaseCallback(
      new CommitSubBatchPreReleaseCallback(this));

  // PessimisticTransactionDB::Initialize
  for (auto cf_ptr : handles) {
    AddColumnFamily(cf_ptr);
  }
  // Verify cf options
  for (auto handle : handles) {
    ColumnFamilyDescriptor cfd;
    Status s = handle->GetDescriptor(&cfd);
    if (!s.ok()) {
      return s;
    }
    s = VerifyCFOptions(cfd.options);
    if (!s.ok()) {
      return s;
    }
  }

  // Re-enable compaction for the column families that initially had
  // compaction enabled.
  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
  for (auto index : compaction_enabled_cf_indices) {
    compaction_enabled_cf_handles.push_back(handles[index]);
  }

  // create 'real' transactions from recovered shell transactions
  auto rtxns = dbimpl->recovered_transactions();
  std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
  for (auto rtxn : rtxns) {
    auto recovered_trx = rtxn.second;
    assert(recovered_trx);
    assert(recovered_trx->batches_.size() >= 1);
    assert(recovered_trx->name_.length());

    // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
    // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
    // two iterations is required.
    if (recovered_trx->unprepared_) {
      continue;
    }

    WriteOptions w_options;
    w_options.sync = true;
    TransactionOptions t_options;

    auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
    auto first_seq = recovered_trx->batches_.begin()->first;
    auto last_prepare_batch_cnt =
        recovered_trx->batches_.begin()->second.batch_cnt_;

    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
    assert(real_trx);
    auto wupt =
        static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
    wupt->recovered_txn_ = true;

    real_trx->SetLogNumber(first_log_number);
    real_trx->SetId(first_seq);
    Status s = real_trx->SetName(recovered_trx->name_);
    if (!s.ok()) {
      return s;
    }
    wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;

    for (auto batch : recovered_trx->batches_) {
      const auto& seq = batch.first;
      const auto& batch_info = batch.second;
      auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
      assert(batch_info.log_number_);

      ordered_seq_cnt[seq] = cnt;
      assert(wupt->unprep_seqs_.count(seq) == 0);
      wupt->unprep_seqs_[seq] = cnt;

      s = wupt->RebuildFromWriteBatch(batch_info.batch_);
      assert(s.ok());
      if (!s.ok()) {
        return s;
      }
    }

    const bool kClear = true;
    wupt->InitWriteBatch(kClear);

    real_trx->SetState(Transaction::PREPARED);
    if (!s.ok()) {
      return s;
    }
  }
  // AddPrepared must be called in order
  for (auto seq_cnt : ordered_seq_cnt) {
    auto seq = seq_cnt.first;
    auto cnt = seq_cnt.second;
    for (size_t i = 0; i < cnt; i++) {
      AddPrepared(seq + i);
    }
  }

  SequenceNumber prev_max = max_evicted_seq_;
  SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
  AdvanceMaxEvictedSeq(prev_max, last_seq);
  // Create a gap between max and the next snapshot. This simplifies the logic
  // in IsInSnapshot by not having to consider the special case of max ==
  // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
  if (last_seq) {
    db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
    db_impl_->versions_->SetLastSequence(last_seq + 1);
    db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
  }

  Status s;
  // Rollback unprepared transactions.
  for (auto rtxn : rtxns) {
    auto recovered_trx = rtxn.second;
    if (recovered_trx->unprepared_) {
      s = RollbackRecoveredTransaction(recovered_trx);
      if (!s.ok()) {
        return s;
      }
      continue;
    }
  }

  if (s.ok()) {
    dbimpl->DeleteAllRecoveredTransactions();

    // Compaction should start only after max_evicted_seq_ is set AND recovered
    // transactions are either added to PrepareHeap or rolled back.
    s = EnableAutoCompaction(compaction_enabled_cf_handles);
  }

  return s;
}

Transaction* WriteUnpreparedTxnDB::BeginTransaction(
    const WriteOptions& write_options, const TransactionOptions& txn_options,
    Transaction* old_txn) {
  if (old_txn != nullptr) {
    ReinitializeTransaction(old_txn, write_options, txn_options);
    return old_txn;
  } else {
    return new WriteUnpreparedTxn(this, write_options, txn_options);
  }
}

// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WriteUnpreparedTxnDB::IteratorState {
  IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
                std::shared_ptr<ManagedSnapshot> s,
                SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
      : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
                 kBackedByDBSnapshot),
        snapshot(s) {}
  SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }

  WriteUnpreparedTxnReadCallback callback;
  std::shared_ptr<ManagedSnapshot> snapshot;
};

namespace {
static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
  delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
}
}  // anonymous namespace

Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
                                            ColumnFamilyHandle* column_family,
                                            WriteUnpreparedTxn* txn) {
  // TODO(lth): Refactor so that this logic is shared with WritePrepared.
  constexpr bool ALLOW_BLOB = true;
  constexpr bool ALLOW_REFRESH = true;
  std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
  SequenceNumber min_uncommitted = 0;

  // Currently, the Prev() iterator logic does not work well without snapshot
  // validation. The logic simply iterates through values of a key in
  // ascending seqno order, stopping at the first non-visible value and
  // returning the last visible value.
  //
  // For example, if snapshot sequence is 3, and we have the following keys:
  // foo: v1 1
  // foo: v2 2
  // foo: v3 3
  // foo: v4 4
  // foo: v5 5
  //
  // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
  // which is the last visible value.
  //
  // For unprepared transactions, if we have snap_seq = 3, but the current
  // transaction has unprep_seq 5, then returning the first non-visible value
  // would be incorrect, as we should return v5, and not v3. The problem is that
  // there are committed values at snapshot_seq < commit_seq < unprep_seq.
  //
  // Snapshot validation can prevent this problem by ensuring that no committed
  // values exist at snapshot_seq < commit_seq, and thus any value with a
  // sequence number greater than snapshot_seq must be unprepared values. For
  // example, if the transaction had a snapshot at 3, then snapshot validation
  // would be performed during the Put(v5) call. It would find v4, and the Put
  // would fail with snapshot validation failure.
  //
  // TODO(lth): Improve Prev() logic to continue iterating until
  // max_visible_seq, and then return the last visible value, so that this
  // restriction can be lifted.
  const Snapshot* snapshot = nullptr;
  if (options.snapshot == nullptr) {
    snapshot = GetSnapshot();
    own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  } else {
    snapshot = options.snapshot;
  }

  snapshot_seq = snapshot->GetSequenceNumber();
  assert(snapshot_seq != kMaxSequenceNumber);
  // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
  // guaranteed that for keys that were modified by this transaction (and thus
  // might have unprepared values), no committed values exist at
  // largest_validated_seq < commit_seq (or the contrapositive: any committed
  // value must exist at commit_seq <= largest_validated_seq). This implies
  // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
  // snapshot_seq. As explained above, the problem with Prev() only happens when
  // snapshot_seq < commit_seq.
  //
  // For keys that were not modified by this transaction, largest_validated_seq_
  // is meaningless, and Prev() should just work with the existing visibility
  // logic.
  if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
      !txn->unprep_seqs_.empty()) {
    ROCKS_LOG_ERROR(info_log_,
                    "WriteUnprepared iterator creation failed since the "
                    "transaction has performed unvalidated writes");
    return nullptr;
  }
  min_uncommitted =
      static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
          ->min_uncommitted_;

  auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  auto* state =
      new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
  auto* db_iter =
      db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
                                &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
  db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
  return db_iter;
}

}  // namespace ROCKSDB_NAMESPACE
#endif  // ROCKSDB_LITE