summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/transactions/write_unprepared_txn_db.h
blob: ad8e40f9436e02349df35b4cddabb75435e7813c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once
#ifndef ROCKSDB_LITE

#include "utilities/transactions/write_prepared_txn_db.h"
#include "utilities/transactions/write_unprepared_txn.h"

namespace ROCKSDB_NAMESPACE {

class WriteUnpreparedTxn;

class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
 public:
  using WritePreparedTxnDB::WritePreparedTxnDB;

  Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
                    const std::vector<ColumnFamilyHandle*>& handles) override;

  Transaction* BeginTransaction(const WriteOptions& write_options,
                                const TransactionOptions& txn_options,
                                Transaction* old_txn) override;

  // Struct to hold ownership of snapshot and read callback for cleanup.
  struct IteratorState;

  using WritePreparedTxnDB::NewIterator;
  Iterator* NewIterator(const ReadOptions& options,
                        ColumnFamilyHandle* column_family,
                        WriteUnpreparedTxn* txn);

 private:
  Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn);
};

class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
  // TODO(lth): Reduce code duplication with
  // WritePreparedCommitEntryPreReleaseCallback
 public:
  // includes_data indicates that the commit also writes non-empty
  // CommitTimeWriteBatch to memtable, which needs to be committed separately.
  WriteUnpreparedCommitEntryPreReleaseCallback(
      WritePreparedTxnDB* db, DBImpl* db_impl,
      const std::map<SequenceNumber, size_t>& unprep_seqs,
      size_t data_batch_cnt = 0, bool publish_seq = true)
      : db_(db),
        db_impl_(db_impl),
        unprep_seqs_(unprep_seqs),
        data_batch_cnt_(data_batch_cnt),
        includes_data_(data_batch_cnt_ > 0),
        publish_seq_(publish_seq) {
    assert(unprep_seqs.size() > 0);
  }

  virtual Status Callback(SequenceNumber commit_seq,
                          bool is_mem_disabled __attribute__((__unused__)),
                          uint64_t, size_t /*index*/,
                          size_t /*total*/) override {
    const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
                                         ? commit_seq
                                         : commit_seq + data_batch_cnt_ - 1;
    // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
    for (const auto& s : unprep_seqs_) {
      for (size_t i = 0; i < s.second; i++) {
        db_->AddCommitted(s.first + i, last_commit_seq);
      }
    }

    if (includes_data_) {
      assert(data_batch_cnt_);
      // Commit the data that is accompanied with the commit request
      for (size_t i = 0; i < data_batch_cnt_; i++) {
        // For commit seq of each batch use the commit seq of the last batch.
        // This would make debugging easier by having all the batches having
        // the same sequence number.
        db_->AddCommitted(commit_seq + i, last_commit_seq);
      }
    }
    if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
      assert(is_mem_disabled);  // implies the 2nd queue
      // Publish the sequence number. We can do that here assuming the callback
      // is invoked only from one write queue, which would guarantee that the
      // publish sequence numbers will be in order, i.e., once a seq is
      // published all the seq prior to that are also publishable.
      db_impl_->SetLastPublishedSequence(last_commit_seq);
    }
    // else SequenceNumber that is updated as part of the write already does the
    // publishing
    return Status::OK();
  }

 private:
  WritePreparedTxnDB* db_;
  DBImpl* db_impl_;
  const std::map<SequenceNumber, size_t>& unprep_seqs_;
  size_t data_batch_cnt_;
  // Either because it is commit without prepare or it has a
  // CommitTimeWriteBatch
  bool includes_data_;
  // Should the callback also publishes the commit seq number
  bool publish_seq_;
};

class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
  // TODO(lth): Reduce code duplication with
  // WritePreparedCommitEntryPreReleaseCallback
 public:
  WriteUnpreparedRollbackPreReleaseCallback(
      WritePreparedTxnDB* db, DBImpl* db_impl,
      const std::map<SequenceNumber, size_t>& unprep_seqs,
      SequenceNumber rollback_seq)
      : db_(db),
        db_impl_(db_impl),
        unprep_seqs_(unprep_seqs),
        rollback_seq_(rollback_seq) {
    assert(unprep_seqs.size() > 0);
    assert(db_impl_->immutable_db_options().two_write_queues);
  }

  virtual Status Callback(SequenceNumber commit_seq,
                          bool is_mem_disabled __attribute__((__unused__)),
                          uint64_t, size_t /*index*/,
                          size_t /*total*/) override {
    assert(is_mem_disabled);  // implies the 2nd queue
    const uint64_t last_commit_seq = commit_seq;
    db_->AddCommitted(rollback_seq_, last_commit_seq);
    // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
    for (const auto& s : unprep_seqs_) {
      for (size_t i = 0; i < s.second; i++) {
        db_->AddCommitted(s.first + i, last_commit_seq);
      }
    }
    db_impl_->SetLastPublishedSequence(last_commit_seq);
    return Status::OK();
  }

 private:
  WritePreparedTxnDB* db_;
  DBImpl* db_impl_;
  const std::map<SequenceNumber, size_t>& unprep_seqs_;
  SequenceNumber rollback_seq_;
};

}  // namespace ROCKSDB_NAMESPACE
#endif  // ROCKSDB_LITE