summaryrefslogtreecommitdiffstats
path: root/src/test/ObjectMap/test_keyvaluedb_atomicity.cc
blob: f93e68c4aafeb743b23d9603f248a21093d33e34 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include <pthread.h>
#include "include/buffer.h"
#include "kv/KeyValueDB.h"
#include <sys/types.h>
#include <dirent.h>
#include <string>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include <iostream>
#include <sstream>
#include "stdlib.h"
#include "global/global_context.h"

using namespace std;

const string CONTROL_PREFIX = "CONTROL";
const string PRIMARY_PREFIX = "PREFIX";
const int NUM_COPIES = 100;
const int NUM_THREADS = 30;

string prefix_gen(int i) {
  stringstream ss;
  ss << PRIMARY_PREFIX << "_" << i << std::endl;
  return ss.str();
}

int verify(KeyValueDB *db) {
  // Verify
  {
    map<int, KeyValueDB::Iterator> iterators;
    for (int i = 0; i < NUM_COPIES; ++i) {
      iterators[i] = db->get_iterator(prefix_gen(i));
      iterators[i]->seek_to_first();
    }
    while (iterators.rbegin()->second->valid()) {
      for (map<int, KeyValueDB::Iterator>::iterator i = iterators.begin();
	   i != iterators.end();
	   ++i) {
	ceph_assert(i->second->valid());
	ceph_assert(i->second->key() == iterators.rbegin()->second->key());
	bufferlist r = i->second->value();
	bufferlist l = iterators.rbegin()->second->value();
	i->second->next();
      }
    }
    for (map<int, KeyValueDB::Iterator>::iterator i = iterators.begin();
	 i != iterators.end();
	 ++i) {
      ceph_assert(!i->second->valid());
    }
  }
  return 0;
}

void *write(void *_db) {
  KeyValueDB *db = static_cast<KeyValueDB*>(_db);
  std::cout << "Writing..." << std::endl;
  for (int i = 0; i < 12000; ++i) {
    if (!(i % 10)) {
      std::cout << "Iteration: " << i << std::endl;
    }
    int key_num = rand();
    stringstream key;
    key << key_num << std::endl;
    map<string, bufferlist> to_set;
    stringstream val;
    val << i << std::endl;
    bufferptr bp(val.str().c_str(), val.str().size() + 1);
    to_set[key.str()].push_back(bp);
    
    KeyValueDB::Transaction t = db->get_transaction();
    for (int j = 0; j < NUM_COPIES; ++j) {
      t->set(prefix_gen(j), to_set);
    }
    ceph_assert(!db->submit_transaction(t));
  }
  return 0;
}

int main() {
  char *path = getenv("OBJECT_MAP_PATH");
  boost::scoped_ptr< KeyValueDB > db;
  if (!path) {
    std::cerr << "No path found, OBJECT_MAP_PATH undefined" << std::endl;
    return 0;
  }
  string strpath(path);
  std::cerr << "Using path: " << strpath << std::endl;
  KeyValueDB *store = KeyValueDB::create(g_ceph_context, "leveldb", strpath);
  ceph_assert(!store->create_and_open(std::cerr));
  db.reset(store);

  verify(db.get());

  vector<pthread_t> threads(NUM_THREADS);
  for (vector<pthread_t>::iterator i = threads.begin();
       i != threads.end();
       ++i) {
    pthread_create(&*i, 0, &write, static_cast<void *>(db.get()));
  }
  for (vector<pthread_t>::iterator i = threads.begin();
       i != threads.end();
       ++i) {
    void *tmp;
    pthread_join(*i, &tmp);
  }
  verify(db.get());
}