summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/wal_edit.cc
blob: 2525be610b47102769b3d5a89243605d3c65b6e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/wal_edit.h"

#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"

namespace ROCKSDB_NAMESPACE {

void WalAddition::EncodeTo(std::string* dst) const {
  PutVarint64(dst, number_);

  if (metadata_.HasSyncedSize()) {
    PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
    PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
  }

  PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}

Status WalAddition::DecodeFrom(Slice* src) {
  constexpr char class_name[] = "WalAddition";

  if (!GetVarint64(src, &number_)) {
    return Status::Corruption(class_name, "Error decoding WAL log number");
  }

  while (true) {
    uint32_t tag_value = 0;
    if (!GetVarint32(src, &tag_value)) {
      return Status::Corruption(class_name, "Error decoding tag");
    }
    WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
    switch (tag) {
      case WalAdditionTag::kSyncedSize: {
        uint64_t size = 0;
        if (!GetVarint64(src, &size)) {
          return Status::Corruption(class_name, "Error decoding WAL file size");
        }
        metadata_.SetSyncedSizeInBytes(size);
        break;
      }
      // TODO: process future tags such as checksum.
      case WalAdditionTag::kTerminate:
        return Status::OK();
      default: {
        std::stringstream ss;
        ss << "Unknown tag " << tag_value;
        return Status::Corruption(class_name, ss.str());
      }
    }
  }
}

JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
  jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
     << wal.GetMetadata().GetSyncedSizeInBytes();
  return jw;
}

std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
  os << "log_number: " << wal.GetLogNumber()
     << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
  return os;
}

std::string WalAddition::DebugString() const {
  std::ostringstream oss;
  oss << *this;
  return oss.str();
}

void WalDeletion::EncodeTo(std::string* dst) const {
  PutVarint64(dst, number_);
}

Status WalDeletion::DecodeFrom(Slice* src) {
  constexpr char class_name[] = "WalDeletion";

  if (!GetVarint64(src, &number_)) {
    return Status::Corruption(class_name, "Error decoding WAL log number");
  }

  return Status::OK();
}

JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
  jw << "LogNumber" << wal.GetLogNumber();
  return jw;
}

std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
  os << "log_number: " << wal.GetLogNumber();
  return os;
}

std::string WalDeletion::DebugString() const {
  std::ostringstream oss;
  oss << *this;
  return oss.str();
}

Status WalSet::AddWal(const WalAddition& wal) {
  if (wal.GetLogNumber() < min_wal_number_to_keep_) {
    // The WAL has been obsolete, ignore it.
    return Status::OK();
  }

  auto it = wals_.lower_bound(wal.GetLogNumber());
  bool existing = it != wals_.end() && it->first == wal.GetLogNumber();

  if (!existing) {
    wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
    return Status::OK();
  }

  assert(existing);
  if (!wal.GetMetadata().HasSyncedSize()) {
    std::stringstream ss;
    ss << "WAL " << wal.GetLogNumber() << " is created more than once";
    return Status::Corruption("WalSet::AddWal", ss.str());
  }

  assert(wal.GetMetadata().HasSyncedSize());
  if (it->second.HasSyncedSize() && wal.GetMetadata().GetSyncedSizeInBytes() <=
                                        it->second.GetSyncedSizeInBytes()) {
    // This is possible because version edits with different synced WAL sizes
    // for the same WAL can be committed out-of-order. For example, thread
    // 1 synces the first 10 bytes of 1.log, while thread 2 synces the first 20
    // bytes of 1.log. It's possible that thread 1 calls LogAndApply() after
    // thread 2.
    // In this case, just return ok.
    return Status::OK();
  }

  // Update synced size for the given WAL.
  it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
  return Status::OK();
}

Status WalSet::AddWals(const WalAdditions& wals) {
  Status s;
  for (const WalAddition& wal : wals) {
    s = AddWal(wal);
    if (!s.ok()) {
      break;
    }
  }
  return s;
}

Status WalSet::DeleteWalsBefore(WalNumber wal) {
  if (wal > min_wal_number_to_keep_) {
    min_wal_number_to_keep_ = wal;
    wals_.erase(wals_.begin(), wals_.lower_bound(wal));
  }
  return Status::OK();
}

void WalSet::Reset() {
  wals_.clear();
  min_wal_number_to_keep_ = 0;
}

Status WalSet::CheckWals(
    Env* env,
    const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
  assert(env != nullptr);

  Status s;
  for (const auto& wal : wals_) {
    const uint64_t log_number = wal.first;
    const WalMetadata& wal_meta = wal.second;

    if (!wal_meta.HasSyncedSize()) {
      // The WAL and WAL directory is not even synced,
      // so the WAL's inode may not be persisted,
      // then the WAL might not show up when listing WAL directory.
      continue;
    }

    if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
      std::stringstream ss;
      ss << "Missing WAL with log number: " << log_number << ".";
      s = Status::Corruption(ss.str());
      break;
    }

    uint64_t log_file_size = 0;
    s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
    if (!s.ok()) {
      break;
    }
    if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
      std::stringstream ss;
      ss << "Size mismatch: WAL (log number: " << log_number
         << ") in MANIFEST is " << wal_meta.GetSyncedSizeInBytes()
         << " bytes , but actually is " << log_file_size << " bytes on disk.";
      s = Status::Corruption(ss.str());
      break;
    }
  }

  return s;
}

}  // namespace ROCKSDB_NAMESPACE