//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/wal_manager.h"

#include <algorithm>
#include <cinttypes>
#include <memory>
#include <vector>

#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/sequence_file_reader.h"
#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {

#ifndef ROCKSDB_LITE

Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
  auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
  if (s.ok()) {
    MutexLock l(&read_first_record_cache_mutex_);
    read_first_record_cache_.erase(number);
  }
  return s;
}

Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
  // First get sorted files in db dir, then get sorted files from archived
  // dir, to avoid a race condition where a log file is moved to archived
  // dir in between.
  Status s;
  // list wal files in main db dir.
  VectorLogPtr logs;
  s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile);
  if (!s.ok()) {
    return s;
  }

  // Reproduce the race condition where a log file is moved
  // to archived dir, between these two sync points, used in
  // (DBTest,TransactionLogIteratorRace)
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");

  files.clear();
  // list wal files in archive dir.
  std::string archivedir = ArchivalDirectory(wal_dir_);
  Status exists = env_->FileExists(archivedir);
  if (exists.ok()) {
    s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
    if (!s.ok()) {
      return s;
    }
  } else if (!exists.IsNotFound()) {
    assert(s.IsIOError());
    return s;
  }

  uint64_t latest_archived_log_number = 0;
  if (!files.empty()) {
    latest_archived_log_number = files.back()->LogNumber();
    ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
                   latest_archived_log_number);
  }

  files.reserve(files.size() + logs.size());
  for (auto& log : logs) {
    if (log->LogNumber() > latest_archived_log_number) {
      files.push_back(std::move(log));
    } else {
      // When the race condition happens, we could see the
      // same log in both db dir and archived dir. Simply
      // ignore the one in db dir. Note that, if we read
      // archived dir first, we would have missed the log file.
      ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
                     log->PathName().c_str());
    }
  }

  return s;
}

Status WalManager::GetUpdatesSince(
    SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
    const TransactionLogIterator::ReadOptions& read_options,
    VersionSet* version_set) {
  if (seq_per_batch_) {
    return Status::NotSupported();
  }

  assert(!seq_per_batch_);

  //  Get all sorted Wal Files.
  //  Do binary search and open files and find the seq number.

  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
  Status s = GetSortedWalFiles(*wal_files);
  if (!s.ok()) {
    return s;
  }

  s = RetainProbableWalFiles(*wal_files, seq);
  if (!s.ok()) {
    return s;
  }
  iter->reset(new TransactionLogIteratorImpl(
      wal_dir_, &db_options_, read_options, file_options_, seq,
      std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
  return (*iter)->status();
}

// 1. Go through all archived files and
//    a. if ttl is enabled, delete outdated files
//    b. if archive size limit is enabled, delete empty files,
//        compute file number and size.
// 2. If size limit is enabled:
//    a. compute how many files should be deleted
//    b. get sorted non-empty archived logs
//    c. delete what should be deleted
void WalManager::PurgeObsoleteWALFiles() {
  bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
  bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
  if (!ttl_enabled && !size_limit_enabled) {
    return;
  }

  int64_t current_time = 0;
  Status s = db_options_.clock->GetCurrentTime(&current_time);
  if (!s.ok()) {
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
                    s.ToString().c_str());
    assert(false);
    return;
  }
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
  uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
                                     ? db_options_.WAL_ttl_seconds / 2
                                     : kDefaultIntervalToDeleteObsoleteWAL;

  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
    return;
  }

  purge_wal_files_last_run_ = now_seconds;

  std::string archival_dir = ArchivalDirectory(wal_dir_);
  std::vector<std::string> files;
  s = env_->GetChildren(archival_dir, &files);
  if (!s.ok()) {
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
                    s.ToString().c_str());
    assert(false);
    return;
  }

  size_t log_files_num = 0;
  uint64_t log_file_size = 0;
  for (auto& f : files) {
    uint64_t number;
    FileType type;
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
      std::string const file_path = archival_dir + "/" + f;
      if (ttl_enabled) {
        uint64_t file_m_time;
        s = env_->GetFileModificationTime(file_path, &file_m_time);
        if (!s.ok()) {
          ROCKS_LOG_WARN(db_options_.info_log,
                         "Can't get file mod time: %s: %s", file_path.c_str(),
                         s.ToString().c_str());
          continue;
        }
        if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
          s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
                           /*force_fg=*/!wal_in_db_path_);
          if (!s.ok()) {
            ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
                           file_path.c_str(), s.ToString().c_str());
            continue;
          } else {
            MutexLock l(&read_first_record_cache_mutex_);
            read_first_record_cache_.erase(number);
          }
          continue;
        }
      }

      if (size_limit_enabled) {
        uint64_t file_size;
        s = env_->GetFileSize(file_path, &file_size);
        if (!s.ok()) {
          ROCKS_LOG_ERROR(db_options_.info_log,
                          "Unable to get file size: %s: %s", file_path.c_str(),
                          s.ToString().c_str());
          return;
        } else {
          if (file_size > 0) {
            log_file_size = std::max(log_file_size, file_size);
            ++log_files_num;
          } else {
            s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
                             /*force_fg=*/!wal_in_db_path_);
            if (!s.ok()) {
              ROCKS_LOG_WARN(db_options_.info_log,
                             "Unable to delete file: %s: %s", file_path.c_str(),
                             s.ToString().c_str());
              continue;
            } else {
              MutexLock l(&read_first_record_cache_mutex_);
              read_first_record_cache_.erase(number);
            }
          }
        }
      }
    }
  }

  if (0 == log_files_num || !size_limit_enabled) {
    return;
  }

  size_t const files_keep_num = static_cast<size_t>(
      db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size);
  if (log_files_num <= files_keep_num) {
    return;
  }

  size_t files_del_num = log_files_num - files_keep_num;
  VectorLogPtr archived_logs;
  s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
  if (!s.ok()) {
    ROCKS_LOG_WARN(db_options_.info_log,
                   "Unable to get archived WALs from: %s: %s",
                   archival_dir.c_str(), s.ToString().c_str());
    files_del_num = 0;
  } else if (files_del_num > archived_logs.size()) {
    ROCKS_LOG_WARN(db_options_.info_log,
                   "Trying to delete more archived log files than "
                   "exist. Deleting all");
    files_del_num = archived_logs.size();
  }

  for (size_t i = 0; i < files_del_num; ++i) {
    std::string const file_path = archived_logs[i]->PathName();
    s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
                     /*force_fg=*/!wal_in_db_path_);
    if (!s.ok()) {
      ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
                     file_path.c_str(), s.ToString().c_str());
      continue;
    } else {
      MutexLock l(&read_first_record_cache_mutex_);
      read_first_record_cache_.erase(archived_logs[i]->LogNumber());
    }
  }
}

void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
  auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
  Status s = env_->RenameFile(fname, archived_log_name);
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
  ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
                 fname.c_str(), archived_log_name.c_str(),
                 s.ToString().c_str());
}

Status WalManager::GetSortedWalsOfType(const std::string& path,
                                       VectorLogPtr& log_files,
                                       WalFileType log_type) {
  std::vector<std::string> all_files;
  const Status status = env_->GetChildren(path, &all_files);
  if (!status.ok()) {
    return status;
  }
  log_files.reserve(all_files.size());
  for (const auto& f : all_files) {
    uint64_t number;
    FileType type;
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
      SequenceNumber sequence;
      Status s = ReadFirstRecord(log_type, number, &sequence);
      if (!s.ok()) {
        return s;
      }
      if (sequence == 0) {
        // empty file
        continue;
      }

      // Reproduce the race condition where a log file is moved
      // to archived dir, between these two sync points, used in
      // (DBTest,TransactionLogIteratorRace)
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");

      uint64_t size_bytes;
      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
      // re-try in case the alive log file has been moved to archive.
      if (!s.ok() && log_type == kAliveLogFile) {
        std::string archived_file = ArchivedLogFileName(path, number);
        if (env_->FileExists(archived_file).ok()) {
          s = env_->GetFileSize(archived_file, &size_bytes);
          if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
            // oops, the file just got deleted from archived dir! move on
            s = Status::OK();
            continue;
          }
        }
      }
      if (!s.ok()) {
        return s;
      }

      log_files.push_back(std::unique_ptr<LogFile>(
          new LogFileImpl(number, log_type, sequence, size_bytes)));
    }
  }
  std::sort(
      log_files.begin(), log_files.end(),
      [](const std::unique_ptr<LogFile>& a, const std::unique_ptr<LogFile>& b) {
        LogFileImpl* a_impl = static_cast_with_check<LogFileImpl>(a.get());
        LogFileImpl* b_impl = static_cast_with_check<LogFileImpl>(b.get());
        return *a_impl < *b_impl;
      });
  return status;
}

Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
                                          const SequenceNumber target) {
  int64_t start = 0;  // signed to avoid overflow when target is < first file.
  int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
  // Binary Search. avoid opening all files.
  while (end >= start) {
    int64_t mid = start + (end - start) / 2;  // Avoid overflow.
    SequenceNumber current_seq_num =
        all_logs.at(static_cast<size_t>(mid))->StartSequence();
    if (current_seq_num == target) {
      end = mid;
      break;
    } else if (current_seq_num < target) {
      start = mid + 1;
    } else {
      end = mid - 1;
    }
  }
  // end could be -ve.
  size_t start_index =
      static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
  // The last wal file is always included
  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
  return Status::OK();
}

Status WalManager::ReadFirstRecord(const WalFileType type,
                                   const uint64_t number,
                                   SequenceNumber* sequence) {
  *sequence = 0;
  if (type != kAliveLogFile && type != kArchivedLogFile) {
    ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
                    std::to_string(type).c_str());
    return Status::NotSupported("File Type Not Known " + std::to_string(type));
  }
  {
    MutexLock l(&read_first_record_cache_mutex_);
    auto itr = read_first_record_cache_.find(number);
    if (itr != read_first_record_cache_.end()) {
      *sequence = itr->second;
      return Status::OK();
    }
  }
  Status s;
  if (type == kAliveLogFile) {
    std::string fname = LogFileName(wal_dir_, number);
    s = ReadFirstLine(fname, number, sequence);
    if (!s.ok() && env_->FileExists(fname).ok()) {
      // return any error that is not caused by non-existing file
      return s;
    }
  }

  if (type == kArchivedLogFile || !s.ok()) {
    //  check if the file got moved to archive.
    std::string archived_file = ArchivedLogFileName(wal_dir_, number);
    s = ReadFirstLine(archived_file, number, sequence);
    // maybe the file was deleted from archive dir. If that's the case, return
    // Status::OK(). The caller with identify this as empty file because
    // *sequence == 0
    if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
      return Status::OK();
    }
  }

  if (s.ok() && *sequence != 0) {
    MutexLock l(&read_first_record_cache_mutex_);
    read_first_record_cache_.insert({number, *sequence});
  }
  return s;
}

Status WalManager::GetLiveWalFile(uint64_t number,
                                  std::unique_ptr<LogFile>* log_file) {
  if (!log_file) {
    return Status::InvalidArgument("log_file not preallocated.");
  }

  if (!number) {
    return Status::PathNotFound("log file not available");
  }

  Status s;

  uint64_t size_bytes;
  s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);

  if (!s.ok()) {
    return s;
  }

  log_file->reset(new LogFileImpl(number, kAliveLogFile,
                                  0,  // SequenceNumber
                                  size_bytes));

  return Status::OK();
}

// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status WalManager::ReadFirstLine(const std::string& fname,
                                 const uint64_t number,
                                 SequenceNumber* sequence) {
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;

    Status* status;
    bool ignore_error;  // true if db_options_.paranoid_checks==false
    void Corruption(size_t bytes, const Status& s) override {
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
                     static_cast<int>(bytes), s.ToString().c_str());
      if (this->status->ok()) {
        // only keep the first error
        *this->status = s;
      }
    }
  };

  std::unique_ptr<FSSequentialFile> file;
  Status status = fs_->NewSequentialFile(
      fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
  std::unique_ptr<SequentialFileReader> file_reader(
      new SequentialFileReader(std::move(file), fname, io_tracer_));

  if (!status.ok()) {
    return status;
  }

  LogReporter reporter;
  reporter.env = env_;
  reporter.info_log = db_options_.info_log.get();
  reporter.fname = fname.c_str();
  reporter.status = &status;
  reporter.ignore_error = !db_options_.paranoid_checks;
  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
                     true /*checksum*/, number);
  std::string scratch;
  Slice record;

  if (reader.ReadRecord(&record, &scratch) &&
      (status.ok() || !db_options_.paranoid_checks)) {
    if (record.size() < WriteBatchInternal::kHeader) {
      reporter.Corruption(record.size(),
                          Status::Corruption("log record too small"));
      // TODO read record's till the first no corrupt entry?
    } else {
      WriteBatch batch;
      // We can overwrite an existing non-OK Status since it'd only reach here
      // with `paranoid_checks == false`.
      status = WriteBatchInternal::SetContents(&batch, record);
      if (status.ok()) {
        *sequence = WriteBatchInternal::Sequence(&batch);
        return status;
      }
    }
  }

  if (status.ok() && reader.IsCompressedAndEmptyFile()) {
    // In case of wal_compression, it writes a `kSetCompressionType` record
    // which is not associated with any sequence number. As result for an empty
    // file, GetSortedWalsOfType() will skip these WALs causing the operations
    // to fail.
    // Therefore, in order to avoid that failure, it sets sequence_number to 1
    // indicating those WALs should be included.
    *sequence = 1;
  } else {
    // ReadRecord might have returned false on EOF, which means that the log
    // file is empty. Or, a failure may have occurred while processing the first
    // entry. In any case, return status and set sequence number to 0.
    *sequence = 0;
  }
  return status;
}

#endif  // ROCKSDB_LITE
}  // namespace ROCKSDB_NAMESPACE