diff options
Diffstat (limited to 'src/test/objectstore/FileStoreTracker.cc')
-rw-r--r-- | src/test/objectstore/FileStoreTracker.cc | 451 |
1 files changed, 451 insertions, 0 deletions
diff --git a/src/test/objectstore/FileStoreTracker.cc b/src/test/objectstore/FileStoreTracker.cc new file mode 100644 index 00000000..dfaa88d1 --- /dev/null +++ b/src/test/objectstore/FileStoreTracker.cc @@ -0,0 +1,451 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +#include "FileStoreTracker.h" +#include <stdlib.h> +#include <iostream> +#include <boost/scoped_ptr.hpp> +#include "include/Context.h" +#include "common/Mutex.h" + +class OnApplied : public Context { + FileStoreTracker *tracker; + list<pair<pair<coll_t, string>, uint64_t> > in_flight; +public: + OnApplied(FileStoreTracker *tracker, + list<pair<pair<coll_t, string>, uint64_t> > in_flight) + : tracker(tracker), in_flight(in_flight) {} + + void finish(int r) override { + for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = + in_flight.begin(); + i != in_flight.end(); + ++i) { + tracker->applied(i->first, i->second); + } + } +}; + +class OnCommitted : public Context { + FileStoreTracker *tracker; + list<pair<pair<coll_t, string>, uint64_t> > in_flight; +public: + OnCommitted(FileStoreTracker *tracker, + list<pair<pair<coll_t, string>, uint64_t> > in_flight) + : tracker(tracker), in_flight(in_flight) {} + + void finish(int r) override { + for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = + in_flight.begin(); + i != in_flight.end(); + ++i) { + tracker->committed(i->first, i->second); + } + } +}; + +int FileStoreTracker::init() +{ + set<string> to_get; + to_get.insert("STATUS"); + map<string, bufferlist> got; + db->get("STATUS", to_get, &got); + restart_seq = 0; + if (!got.empty()) { + auto bp = got.begin()->second.cbegin(); + decode(restart_seq, bp); + } + ++restart_seq; + KeyValueDB::Transaction t = db->get_transaction(); + got.clear(); + encode(restart_seq, got["STATUS"]); + t->set("STATUS", got); + db->submit_transaction(t); + return 0; +} + +void FileStoreTracker::submit_transaction(Transaction &t) +{ + list<pair<pair<coll_t, string>, uint64_t> > in_flight; + OutTransaction out; + out.t = new ObjectStore::Transaction; + out.in_flight = &in_flight; + for (list<Transaction::Op*>::iterator i = t.ops.begin(); + i != t.ops.end(); + ++i) { + (**i)(this, &out); + } + out.t->register_on_applied(new OnApplied(this, in_flight)); + out.t->register_on_commit(new OnCommitted(this, in_flight)); + auto ch = store->open_collection(coll_t()); + store->queue_transaction(ch, std::move(*out.t), nullptr); + delete out.t; +} + +void FileStoreTracker::write(const pair<coll_t, string> &obj, + OutTransaction *out) +{ + Mutex::Locker l(lock); + std::cerr << "Writing " << obj << std::endl; + ObjectContents contents = get_current_content(obj); + + uint64_t offset = rand() % (SIZE/2); + uint64_t len = rand() % (SIZE/2); + if (!len) len = 10; + contents.write(rand(), offset, len); + + bufferlist to_write; + ObjectContents::Iterator iter = contents.get_iterator(); + iter.seek_to(offset); + for (uint64_t i = offset; + i < offset + len; + ++i, ++iter) { + ceph_assert(iter.valid()); + to_write.append(*iter); + } + out->t->write(coll_t(obj.first), + ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))), + offset, + len, + to_write); + out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); +} + +void FileStoreTracker::remove(const pair<coll_t, string> &obj, + OutTransaction *out) +{ + std::cerr << "Deleting " << obj << std::endl; + Mutex::Locker l(lock); + ObjectContents old_contents = get_current_content(obj); + if (!old_contents.exists()) + return; + out->t->remove(coll_t(obj.first), + ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP)))); + ObjectContents contents; + out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); +} + +void FileStoreTracker::clone_range(const pair<coll_t, string> &from, + const pair<coll_t, string> &to, + OutTransaction *out) { + Mutex::Locker l(lock); + std::cerr << "CloningRange " << from << " to " << to << std::endl; + ceph_assert(from.first == to.first); + ObjectContents from_contents = get_current_content(from); + ObjectContents to_contents = get_current_content(to); + if (!from_contents.exists()) { + return; + } + if (from.second == to.second) { + return; + } + + uint64_t new_size = from_contents.size(); + interval_set<uint64_t> interval_to_clone; + uint64_t offset = rand() % (new_size/2); + uint64_t len = rand() % (new_size/2); + if (!len) len = 10; + interval_to_clone.insert(offset, len); + to_contents.clone_range(from_contents, interval_to_clone); + out->t->clone_range(coll_t(from.first), + ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), + ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))), + offset, + len, + offset); + out->in_flight->push_back(make_pair(to, set_content(to, to_contents))); +} + +void FileStoreTracker::clone(const pair<coll_t, string> &from, + const pair<coll_t, string> &to, + OutTransaction *out) { + Mutex::Locker l(lock); + std::cerr << "Cloning " << from << " to " << to << std::endl; + ceph_assert(from.first == to.first); + if (from.second == to.second) { + return; + } + ObjectContents from_contents = get_current_content(from); + ObjectContents to_contents = get_current_content(to); + if (!from_contents.exists()) { + return; + } + + if (to_contents.exists()) + out->t->remove(coll_t(to.first), + ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); + out->t->clone(coll_t(from.first), + ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), + ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); + out->in_flight->push_back(make_pair(to, set_content(to, from_contents))); +} + + +string obj_to_prefix(const pair<coll_t, string> &obj) { + string sep; + sep.push_back('^'); + return obj.first.to_str() + sep + obj.second + "_CONTENTS_"; +} + +string obj_to_meta_prefix(const pair<coll_t, string> &obj) { + string sep; + sep.push_back('^'); + return obj.first.to_str() + sep + obj.second; +} + +string seq_to_key(uint64_t seq) { + char buf[50]; + snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq); + return string(buf); +} + +struct ObjStatus { + uint64_t last_applied; + uint64_t last_committed; + uint64_t restart_seq; + ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {} + + uint64_t get_last_applied(uint64_t seq) const { + if (seq > restart_seq) + return last_committed; + else + return last_applied; + } + void set_last_applied(uint64_t _last_applied, uint64_t seq) { + last_applied = _last_applied; + restart_seq = seq; + } + uint64_t trim_to() const { + return last_applied < last_committed ? + last_applied : last_committed; + } +}; +void encode(const ObjStatus &obj, bufferlist &bl) { + encode(obj.last_applied, bl); + encode(obj.last_committed, bl); + encode(obj.restart_seq, bl); +} +void decode(ObjStatus &obj, bufferlist::const_iterator &bl) { + decode(obj.last_applied, bl); + decode(obj.last_committed, bl); + decode(obj.restart_seq, bl); +} + + +ObjStatus get_obj_status(const pair<coll_t, string> &obj, + KeyValueDB *db) +{ + set<string> to_get; + to_get.insert("META"); + map<string, bufferlist> got; + db->get(obj_to_meta_prefix(obj), to_get, &got); + ObjStatus retval; + if (!got.empty()) { + auto bp = got.begin()->second.cbegin(); + decode(retval, bp); + } + return retval; +} + +void set_obj_status(const pair<coll_t, string> &obj, + const ObjStatus &status, + KeyValueDB::Transaction t) +{ + map<string, bufferlist> to_set; + encode(status, to_set["META"]); + t->set(obj_to_meta_prefix(obj), to_set); +} + +void _clean_forward(const pair<coll_t, string> &obj, + uint64_t last_valid, + KeyValueDB *db) +{ + KeyValueDB::Transaction t = db->get_transaction(); + KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj)); + set<string> to_remove; + i->upper_bound(seq_to_key(last_valid)); + for (; i->valid(); i->next()) { + to_remove.insert(i->key()); + } + t->rmkeys(obj_to_prefix(obj), to_remove); + db->submit_transaction(t); +} + + +void FileStoreTracker::verify(const coll_t &coll, const string &obj, + bool on_start) { + Mutex::Locker l(lock); + std::cerr << "Verifying " << make_pair(coll, obj) << std::endl; + + pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj)); + std::cerr << "valid_reads is " << valid_reads << std::endl; + bufferlist contents; + auto ch = store->open_collection(coll_t(coll)); + int r = store->read(ch, + ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))), + 0, + 2*SIZE, + contents); + std::cerr << "exists: " << r << std::endl; + + + for (uint64_t i = valid_reads.first; + i < valid_reads.second; + ++i) { + ObjectContents old_contents = get_content(make_pair(coll, obj), i); + + std::cerr << "old_contents exists " << old_contents.exists() << std::endl; + if (!old_contents.exists() && (r == -ENOENT)) + return; + + if (old_contents.exists() && (r == -ENOENT)) + continue; + + if (!old_contents.exists() && (r != -ENOENT)) + continue; + + if (contents.length() != old_contents.size()) { + std::cerr << "old_contents.size() is " + << old_contents.size() << std::endl; + continue; + } + + bufferlist::iterator bp = contents.begin(); + ObjectContents::Iterator iter = old_contents.get_iterator(); + iter.seek_to_first(); + bool matches = true; + uint64_t pos = 0; + for (; !bp.end() && iter.valid(); + ++iter, ++bp, ++pos) { + if (*iter != *bp) { + std::cerr << "does not match at pos " << pos << std::endl; + matches = false; + break; + } + } + if (matches) { + if (on_start) + _clean_forward(make_pair(coll, obj), i, db); + return; + } + } + std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl; + ceph_abort(); +} + +ObjectContents FileStoreTracker::get_current_content( + const pair<coll_t, string> &obj) +{ + KeyValueDB::Iterator iter = db->get_iterator( + obj_to_prefix(obj)); + iter->seek_to_last(); + if (iter->valid()) { + bufferlist bl = iter->value(); + auto bp = bl.cbegin(); + pair<uint64_t, bufferlist> val; + decode(val, bp); + ceph_assert(seq_to_key(val.first) == iter->key()); + bp = val.second.begin(); + return ObjectContents(bp); + } + return ObjectContents(); +} + +ObjectContents FileStoreTracker::get_content( + const pair<coll_t, string> &obj, uint64_t version) +{ + set<string> to_get; + map<string, bufferlist> got; + to_get.insert(seq_to_key(version)); + db->get(obj_to_prefix(obj), to_get, &got); + if (got.empty()) + return ObjectContents(); + pair<uint64_t, bufferlist> val; + auto bp = got.begin()->second.cbegin(); + decode(val, bp); + bp = val.second.begin(); + ceph_assert(val.first == version); + return ObjectContents(bp); +} + +pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads( + const pair<coll_t, string> &obj) +{ + pair<uint64_t, uint64_t> bounds = make_pair(0,1); + KeyValueDB::Iterator iter = db->get_iterator( + obj_to_prefix(obj)); + iter->seek_to_last(); + if (iter->valid()) { + pair<uint64_t, bufferlist> val; + bufferlist bl = iter->value(); + auto bp = bl.cbegin(); + decode(val, bp); + bounds.second = val.first + 1; + } + + ObjStatus obj_status = get_obj_status(obj, db); + bounds.first = obj_status.get_last_applied(restart_seq); + return bounds; +} + +void clear_obsolete(const pair<coll_t, string> &obj, + const ObjStatus &status, + KeyValueDB *db, + KeyValueDB::Transaction t) +{ + KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj)); + set<string> to_remove; + iter->seek_to_first(); + for (; iter->valid() && iter->key() < seq_to_key(status.trim_to()); + iter->next()) + to_remove.insert(iter->key()); + t->rmkeys(obj_to_prefix(obj), to_remove); +} + +void FileStoreTracker::committed(const pair<coll_t, string> &obj, + uint64_t seq) { + Mutex::Locker l(lock); + ObjStatus status = get_obj_status(obj, db); + ceph_assert(status.last_committed < seq); + status.last_committed = seq; + KeyValueDB::Transaction t = db->get_transaction(); + clear_obsolete(obj, status, db, t); + set_obj_status(obj, status, t); + db->submit_transaction(t); +} + +void FileStoreTracker::applied(const pair<coll_t, string> &obj, + uint64_t seq) { + Mutex::Locker l(lock); + std::cerr << "Applied " << obj << " version " << seq << std::endl; + ObjStatus status = get_obj_status(obj, db); + ceph_assert(status.last_applied < seq); + status.set_last_applied(seq, restart_seq); + KeyValueDB::Transaction t = db->get_transaction(); + clear_obsolete(obj, status, db, t); + set_obj_status(obj, status, t); + db->submit_transaction(t); +} + + +uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj, + ObjectContents &content) { + KeyValueDB::Transaction t = db->get_transaction(); + KeyValueDB::Iterator iter = db->get_iterator( + obj_to_prefix(obj)); + iter->seek_to_last(); + uint64_t most_recent = 0; + if (iter->valid()) { + pair<uint64_t, bufferlist> val; + bufferlist bl = iter->value(); + auto bp = bl.cbegin(); + decode(val, bp); + most_recent = val.first; + } + bufferlist buf_content; + content.encode(buf_content); + map<string, bufferlist> to_set; + encode(make_pair(most_recent + 1, buf_content), + to_set[seq_to_key(most_recent + 1)]); + t->set(obj_to_prefix(obj), to_set); + db->submit_transaction(t); + return most_recent + 1; +} |