summaryrefslogtreecommitdiffstats
path: root/src/test/objectstore/test_deferred.cc
blob: f2fd6054f1d241fbd88ee8d2deedd8942e835815 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <stdio.h>
#include <string.h>
#include <iostream>
#include <memory>
#include <time.h>

#include "os/ObjectStore.h"
#include "os/bluestore/BlueStore.h"
#include "include/Context.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/errno.h"
#include "common/options.h" // for the size literals
#include <semaphore.h>



class C_do_action : public Context {
public:
  std::function<void()> action;
  C_do_action(std::function<void()> action)
    : action(action) {}

  void finish(int r) override {
    action();
  }
};

void create_deferred_and_terminate() {
  std::unique_ptr<ObjectStore> store;

  g_ceph_context->_conf._clear_safe_to_start_threads();
  g_ceph_context->_conf.set_val_or_die("bluestore_prefer_deferred_size", "4096");
  g_ceph_context->_conf.set_val_or_die("bluestore_allocator", "bitmap");
  g_ceph_context->_conf.set_val_or_die("bluestore_block_size", "10240000000");
  g_ceph_context->_conf.apply_changes(nullptr);

  int64_t poolid;
  coll_t cid;
  ghobject_t hoid;
  ObjectStore::CollectionHandle ch;
  ceph_assert(::mkdir("bluestore.test_temp_dir", 0777) == 0);
  store.reset(ObjectStore::create(g_ceph_context,
				  "bluestore",
				  "bluestore.test_temp_dir",
				  "store_test_temp_journal"));
  ceph_assert(store->mkfs() == 0);
  ceph_assert(store->mount() == 0);

  poolid = 11;
  cid = coll_t(spg_t(pg_t(1, poolid), shard_id_t::NO_SHARD));
  ch = store->create_new_collection(cid);
  int r;
  {
    ObjectStore::Transaction t;
    t.create_collection(cid, 0);
    r = store->queue_transaction(ch, std::move(t));
    ceph_assert(r == 0);
  }

  {
    ObjectStore::Transaction t;
    std::string oid = "zapchajdziura";
    ghobject_t hoid(hobject_t(oid, "", CEPH_NOSNAP, 1, poolid, ""));
    bufferlist bl;
    bl.append(std::string(0xe000, '-'));
    t.write(cid, hoid, 0, 0xe000, bl);
    r = store->queue_transaction(ch, std::move(t));
    ceph_assert(r == 0);
  }

  size_t object_count = 10;

  // initial fill
  bufferlist bl_64K;
  bl_64K.append(std::string(64 * 1024, '-'));

  std::atomic<size_t> prefill_counter{0};
  sem_t prefill_mutex;
  sem_init(&prefill_mutex, 0, 0);

  for (size_t o = 0; o < object_count; o++) {
    ObjectStore::Transaction t;
    std::string oid = "object-" + std::to_string(o);
    ghobject_t hoid(hobject_t(oid, "", CEPH_NOSNAP, 1, poolid, ""));

    t.write(cid, hoid, 0, bl_64K.length(), bl_64K);
    t.register_on_commit(new C_do_action([&] {
      if (++prefill_counter == object_count) {
	sem_post(&prefill_mutex);
      }
    }));

    r = store->queue_transaction(ch, std::move(t));
    ceph_assert(r == 0);
  }
  sem_wait(&prefill_mutex);

  // small deferred writes over object
  // and complete overwrite of previous one
  bufferlist bl_8_bytes;
  bl_8_bytes.append("abcdefgh");
  std::atomic<size_t> deferred_counter{0};
  for (size_t o = 0; o < object_count - 1; o++) {
    ObjectStore::Transaction t;

    // sprinkle deferred writes
    std::string oid_d = "object-" + std::to_string(o + 1);
    ghobject_t hoid_d(hobject_t(oid_d, "", CEPH_NOSNAP, 1, poolid, ""));

    for(int i = 0; i < 16; i++) {
      t.write(cid, hoid_d, 4096 * i, bl_8_bytes.length(), bl_8_bytes);
    }

    // overwrite previous object
    std::string oid_m = "object-" + std::to_string(o);
    ghobject_t hoid_m(hobject_t(oid_m, "", CEPH_NOSNAP, 1, poolid, ""));
    t.write(cid, hoid_m, 0, bl_64K.length(), bl_64K);

    t.register_on_commit(new C_do_action([&] {
      if (++deferred_counter == object_count - 1) {
        exit(0);
      }
    }));
    r = store->queue_transaction(ch, std::move(t));
    ceph_assert(r == 0);
  }
  sleep(10);
  ceph_assert(0 && "should not reach here");
}

int main(int argc, const char **argv) {
  std::vector<const char*> args;
  argv_to_vec(argc, argv, args);
  auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
			 CODE_ENVIRONMENT_UTILITY,
			 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
  common_init_finish(g_ceph_context);

  create_deferred_and_terminate();
  return 0;
}