summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
blob: 95be4ec3cc6697393d15a15d48d30c1238d3f936 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
#pragma once

#ifndef ROCKSDB_LITE

#include <list>
#include <memory>
#include <string>
#include <vector>

#include "file/random_access_file_reader.h"

#include "rocksdb/comparator.h"
#include "rocksdb/env.h"

#include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
#include "utilities/persistent_cache/lrulist.h"
#include "utilities/persistent_cache/persistent_cache_tier.h"
#include "utilities/persistent_cache/persistent_cache_util.h"

#include "port/port.h"
#include "util/crc32c.h"
#include "util/mutexlock.h"

// The io code path of persistent cache uses pipelined architecture
//
// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
//
// This would enable the system to scale for GB/s of throughput which is
// expected with modern devies like NVM.
//
// The file level operations are encapsulated in the following abstractions
//
// BlockCacheFile
//       ^
//       |
//       |
// RandomAccessCacheFile (For reading)
//       ^
//       |
//       |
// WriteableCacheFile (For writing)
//
// Write IO code path :
//
namespace ROCKSDB_NAMESPACE {

class WriteableCacheFile;
struct BlockInfo;

// Represents a logical record on device
//
// (L)ogical (B)lock (Address = { cache-file-id, offset, size }
struct LogicalBlockAddress {
  LogicalBlockAddress() {}
  explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
                               const uint16_t size)
      : cache_id_(cache_id), off_(off), size_(size) {}

  uint32_t cache_id_ = 0;
  uint32_t off_ = 0;
  uint32_t size_ = 0;
};

typedef LogicalBlockAddress LBA;

// class Writer
//
// Writer is the abstraction used for writing data to file. The component can be
// multithreaded. It is the last step of write pipeline
class Writer {
 public:
  explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
  virtual ~Writer() {}

  // write buffer to file at the given offset
  virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
                     const uint64_t file_off,
                     const std::function<void()> callback) = 0;
  // stop the writer
  virtual void Stop() = 0;

  PersistentCacheTier* const cache_;
};

// class BlockCacheFile
//
// Generic interface to support building file specialized for read/writing
class BlockCacheFile : public LRUElement<BlockCacheFile> {
 public:
  explicit BlockCacheFile(const uint32_t cache_id)
      : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}

  explicit BlockCacheFile(Env* const env, const std::string& dir,
                          const uint32_t cache_id)
      : LRUElement<BlockCacheFile>(),
        env_(env),
        dir_(dir),
        cache_id_(cache_id) {}

  virtual ~BlockCacheFile() {}

  // append key/value to file and return LBA locator to user
  virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
                      LBA* const /*lba*/) {
    assert(!"not implemented");
    return false;
  }

  // read from the record locator (LBA) and return key, value and status
  virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
                    char* /*scratch*/) {
    assert(!"not implemented");
    return false;
  }

  // get file path
  std::string Path() const {
    return dir_ + "/" + std::to_string(cache_id_) + ".rc";
  }
  // get cache ID
  uint32_t cacheid() const { return cache_id_; }
  // Add block information to file data
  // Block information is the list of index reference for this file
  virtual void Add(BlockInfo* binfo) {
    WriteLock _(&rwlock_);
    block_infos_.push_back(binfo);
  }
  // get block information
  std::list<BlockInfo*>& block_infos() { return block_infos_; }
  // delete file and return the size of the file
  virtual Status Delete(uint64_t* size);

 protected:
  port::RWMutex rwlock_;               // synchronization mutex
  Env* const env_ = nullptr;           // Env for OS
  const std::string dir_;              // Directory name
  const uint32_t cache_id_;            // Cache id for the file
  std::list<BlockInfo*> block_infos_;  // List of index entries mapping to the
                                       // file content
};

// class RandomAccessFile
//
// Thread safe implementation for reading random data from file
class RandomAccessCacheFile : public BlockCacheFile {
 public:
  explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
                                 const uint32_t cache_id,
                                 const std::shared_ptr<Logger>& log)
      : BlockCacheFile(env, dir, cache_id), log_(log) {}

  virtual ~RandomAccessCacheFile() {}

  // open file for reading
  bool Open(const bool enable_direct_reads);
  // read data from the disk
  bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;

 private:
  std::unique_ptr<RandomAccessFileReader> freader_;

 protected:
  bool OpenImpl(const bool enable_direct_reads);
  bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);

  std::shared_ptr<Logger> log_;  // log file
};

// class WriteableCacheFile
//
// All writes to the files are cached in buffers. The buffers are flushed to
// disk as they get filled up. When file size reaches a certain size, a new file
// will be created provided there is free space
class WriteableCacheFile : public RandomAccessCacheFile {
 public:
  explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
                              Writer* writer, const std::string& dir,
                              const uint32_t cache_id, const uint32_t max_size,
                              const std::shared_ptr<Logger>& log)
      : RandomAccessCacheFile(env, dir, cache_id, log),
        alloc_(alloc),
        writer_(writer),
        max_size_(max_size) {}

  virtual ~WriteableCacheFile();

  // create file on disk
  bool Create(const bool enable_direct_writes, const bool enable_direct_reads);

  // read data from logical file
  bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
    ReadLock _(&rwlock_);
    const bool closed = eof_ && bufs_.empty();
    if (closed) {
      // the file is closed, read from disk
      return RandomAccessCacheFile::Read(lba, key, block, scratch);
    }
    // file is still being written, read from buffers
    return ReadBuffer(lba, key, block, scratch);
  }

  // append data to end of file
  bool Append(const Slice&, const Slice&, LBA* const) override;
  // End-of-file
  bool Eof() const { return eof_; }

 private:
  friend class ThreadedWriter;

  static const size_t kFileAlignmentSize = 4 * 1024;  // align file size

  bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
  bool ReadBuffer(const LBA& lba, char* data);
  bool ExpandBuffer(const size_t size);
  void DispatchBuffer();
  void BufferWriteDone();
  void CloseAndOpenForReading();
  void ClearBuffers();
  void Close();

  // File layout in memory
  //
  // +------+------+------+------+------+------+
  // | b0   | b1   | b2   | b3   | b4   | b5   |
  // +------+------+------+------+------+------+
  //        ^                           ^
  //        |                           |
  //      buf_doff_                   buf_woff_
  //   (next buffer to           (next buffer to fill)
  //   flush to disk)
  //
  //  The buffers are flushed to disk serially for a given file

  CacheWriteBufferAllocator* const alloc_ = nullptr;  // Buffer provider
  Writer* const writer_ = nullptr;                    // File writer thread
  std::unique_ptr<WritableFile> file_;   // RocksDB Env file abstraction
  std::vector<CacheWriteBuffer*> bufs_;  // Written buffers
  uint32_t size_ = 0;                    // Size of the file
  const uint32_t max_size_;              // Max size of the file
  bool eof_ = false;                     // End of file
  uint32_t disk_woff_ = 0;               // Offset to write on disk
  size_t buf_woff_ = 0;                  // off into bufs_ to write
  size_t buf_doff_ = 0;                  // off into bufs_ to dispatch
  size_t pending_ios_ = 0;               // Number of ios to disk in-progress
  bool enable_direct_reads_ = false;     // Should we enable direct reads
                                         // when reading from disk
};

//
// Abstraction to do writing to device. It is part of pipelined architecture.
//
class ThreadedWriter : public Writer {
 public:
  // Representation of IO to device
  struct IO {
    explicit IO(const bool signal) : signal_(signal) {}
    explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
                const uint64_t file_off, const std::function<void()> callback)
        : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}

    IO(const IO&) = default;
    IO& operator=(const IO&) = default;
    size_t Size() const { return sizeof(IO); }

    WritableFile* file_ = nullptr;     // File to write to
    CacheWriteBuffer* buf_ = nullptr;  // buffer to write
    uint64_t file_off_ = 0;            // file offset
    bool signal_ = false;              // signal to exit thread loop
    std::function<void()> callback_;   // Callback on completion
  };

  explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
                          const size_t io_size);
  virtual ~ThreadedWriter() { assert(threads_.empty()); }

  void Stop() override;
  void Write(WritableFile* const file, CacheWriteBuffer* buf,
             const uint64_t file_off,
             const std::function<void()> callback) override;

 private:
  void ThreadMain();
  void DispatchIO(const IO& io);

  const size_t io_size_ = 0;
  BoundedQueue<IO> q_;
  std::vector<port::Thread> threads_;
};

}  // namespace ROCKSDB_NAMESPACE

#endif