summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/util/compression_context_cache.cc
blob: 6fb5c4fcd976423762d633a2f77324bdd7a98845 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//

#include "util/compression_context_cache.h"

#include "util/compression.h"
#include "util/core_local.h"

#include <atomic>

namespace rocksdb {
namespace compression_cache {

void* const SentinelValue = nullptr;
// Cache ZSTD uncompression contexts for reads
// if needed we can add ZSTD compression context caching
// which is currently is not done since BlockBasedTableBuilder
// simply creates one compression context per new SST file.
struct ZSTDCachedData {
  // We choose to cache the below structure instead of a ptr
  // because we want to avoid a) native types leak b) make
  // cache use transparent for the user
  ZSTDUncompressCachedData uncomp_cached_data_;
  std::atomic<void*> zstd_uncomp_sentinel_;

  char
      padding[(CACHE_LINE_SIZE -
               (sizeof(ZSTDUncompressCachedData) + sizeof(std::atomic<void*>)) %
                   CACHE_LINE_SIZE)];  // unused padding field

  ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {}
  ZSTDCachedData(const ZSTDCachedData&) = delete;
  ZSTDCachedData& operator=(const ZSTDCachedData&) = delete;

  ZSTDUncompressCachedData GetUncompressData(int64_t idx) {
    ZSTDUncompressCachedData result;
    void* expected = &uncomp_cached_data_;
    if (zstd_uncomp_sentinel_.compare_exchange_strong(expected,
                                                      SentinelValue)) {
      uncomp_cached_data_.CreateIfNeeded();
      result.InitFromCache(uncomp_cached_data_, idx);
    } else {
      // Creates one time use data
      result.CreateIfNeeded();
    }
    return result;
  }
  // Return the entry back into circulation
  // This is executed only when we successfully obtained
  // in the first place
  void ReturnUncompressData() {
    if (zstd_uncomp_sentinel_.exchange(&uncomp_cached_data_) != SentinelValue) {
      // Means we are returning while not having it acquired.
      assert(false);
    }
  }
};
static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0,
              "Expected CACHE_LINE_SIZE alignment");
}  // namespace compression_cache

using namespace compression_cache;

class CompressionContextCache::Rep {
 public:
  Rep() {}
  ZSTDUncompressCachedData GetZSTDUncompressData() {
    auto p = per_core_uncompr_.AccessElementAndIndex();
    int64_t idx = static_cast<int64_t>(p.second);
    return p.first->GetUncompressData(idx);
  }
  void ReturnZSTDUncompressData(int64_t idx) {
    assert(idx >= 0);
    auto* cn = per_core_uncompr_.AccessAtCore(static_cast<size_t>(idx));
    cn->ReturnUncompressData();
  }

 private:
  CoreLocalArray<ZSTDCachedData> per_core_uncompr_;
};

CompressionContextCache::CompressionContextCache() : rep_(new Rep()) {}

CompressionContextCache* CompressionContextCache::Instance() {
  static CompressionContextCache instance;
  return &instance;
}

void CompressionContextCache::InitSingleton() { Instance(); }

ZSTDUncompressCachedData
CompressionContextCache::GetCachedZSTDUncompressData() {
  return rep_->GetZSTDUncompressData();
}

void CompressionContextCache::ReturnCachedZSTDUncompressData(int64_t idx) {
  rep_->ReturnZSTDUncompressData(idx);
}

CompressionContextCache::~CompressionContextCache() { delete rep_; }

}  // namespace rocksdb