summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/transaction_log_impl.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/db/transaction_log_impl.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/transaction_log_impl.h126
1 files changed, 126 insertions, 0 deletions
diff --git a/src/rocksdb/db/transaction_log_impl.h b/src/rocksdb/db/transaction_log_impl.h
new file mode 100644
index 00000000..6382b61a
--- /dev/null
+++ b/src/rocksdb/db/transaction_log_impl.h
@@ -0,0 +1,126 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+#include <vector>
+
+#include "db/log_reader.h"
+#include "db/version_set.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/transaction_log.h"
+#include "rocksdb/types.h"
+#include "util/filename.h"
+
+namespace rocksdb {
+
+class LogFileImpl : public LogFile {
+ public:
+ LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
+ uint64_t sizeBytes) :
+ logNumber_(logNum),
+ type_(logType),
+ startSequence_(startSeq),
+ sizeFileBytes_(sizeBytes) {
+ }
+
+ std::string PathName() const override {
+ if (type_ == kArchivedLogFile) {
+ return ArchivedLogFileName("", logNumber_);
+ }
+ return LogFileName("", logNumber_);
+ }
+
+ uint64_t LogNumber() const override { return logNumber_; }
+
+ WalFileType Type() const override { return type_; }
+
+ SequenceNumber StartSequence() const override { return startSequence_; }
+
+ uint64_t SizeFileBytes() const override { return sizeFileBytes_; }
+
+ bool operator < (const LogFile& that) const {
+ return LogNumber() < that.LogNumber();
+ }
+
+ private:
+ uint64_t logNumber_;
+ WalFileType type_;
+ SequenceNumber startSequence_;
+ uint64_t sizeFileBytes_;
+
+};
+
+class TransactionLogIteratorImpl : public TransactionLogIterator {
+ public:
+ TransactionLogIteratorImpl(
+ const std::string& dir, const ImmutableDBOptions* options,
+ const TransactionLogIterator::ReadOptions& read_options,
+ const EnvOptions& soptions, const SequenceNumber seqNum,
+ std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
+ const bool seq_per_batch);
+
+ virtual bool Valid() override;
+
+ virtual void Next() override;
+
+ virtual Status status() override;
+
+ virtual BatchResult GetBatch() override;
+
+ private:
+ const std::string& dir_;
+ const ImmutableDBOptions* options_;
+ const TransactionLogIterator::ReadOptions read_options_;
+ const EnvOptions& soptions_;
+ SequenceNumber starting_sequence_number_;
+ std::unique_ptr<VectorLogPtr> files_;
+ bool started_;
+ bool is_valid_; // not valid when it starts of.
+ Status current_status_;
+ size_t current_file_index_;
+ std::unique_ptr<WriteBatch> current_batch_;
+ std::unique_ptr<log::Reader> current_log_reader_;
+ Status OpenLogFile(const LogFile* log_file,
+ std::unique_ptr<SequentialFileReader>* file);
+
+ struct LogReporter : public log::Reader::Reporter {
+ Env* env;
+ Logger* info_log;
+ virtual void Corruption(size_t bytes, const Status& s) override {
+ ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
+ s.ToString().c_str());
+ }
+ virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); }
+ } reporter_;
+
+ SequenceNumber
+ current_batch_seq_; // sequence number at start of current batch
+ SequenceNumber current_last_seq_; // last sequence in the current batch
+ // Used only to get latest seq. num
+ // TODO(icanadi) can this be just a callback?
+ VersionSet const* const versions_;
+ const bool seq_per_batch_;
+ // Reads from transaction log only if the writebatch record has been written
+ bool RestrictedRead(Slice* record, std::string* scratch);
+ // Seeks to startingSequenceNumber reading from startFileIndex in files_.
+ // If strict is set,then must get a batch starting with startingSequenceNumber
+ void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false);
+ // Implementation of Next. SeekToStartSequence calls it internally with
+ // internal=true to let it find next entry even if it has to jump gaps because
+ // the iterator may start off from the first available entry but promises to
+ // be continuous after that
+ void NextImpl(bool internal = false);
+ // Check if batch is expected, else return false
+ bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
+ // Update current batch if a continuous batch is found, else return false
+ void UpdateCurrentWriteBatch(const Slice& record);
+ Status OpenLogReader(const LogFile* file);
+};
+} // namespace rocksdb
+#endif // ROCKSDB_LITE