diff options
Diffstat (limited to 'src/client/barrier.cc')
-rw-r--r-- | src/client/barrier.cc | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/src/client/barrier.cc b/src/client/barrier.cc new file mode 100644 index 000000000..7a0c08868 --- /dev/null +++ b/src/client/barrier.cc @@ -0,0 +1,196 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * + * Copyright (C) 2012 CohortFS, LLC. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#if defined(__FreeBSD__) +#include <sys/param.h> +#endif + +#include "include/Context.h" +#include "Client.h" +#include "barrier.h" +#include "include/ceph_assert.h" + +#undef dout_prefix +#define dout_prefix *_dout << "client." << whoami << " " + +#define dout_subsys ceph_subsys_client + +#define cldout(cl, v) dout_impl((cl)->cct, dout_subsys, v) \ + *_dout << "client." << cl->whoami << " " + +/* C_Block_Sync */ +class C_Block_Sync : public Context { +private: + Client *cl; + uint64_t ino; + barrier_interval iv; + enum CBlockSync_State state; + Barrier *barrier; + int *rval; /* see Cond.h */ + +public: + boost::intrusive::list_member_hook<> intervals_hook; + C_Block_Sync(Client *c, uint64_t i, barrier_interval iv, int *r); + void finish(int rval); + + friend class Barrier; + friend class BarrierContext; +}; + +C_Block_Sync::C_Block_Sync(Client *c, uint64_t i, barrier_interval iv, + int *r=0) : + cl(c), ino(i), iv(iv), rval(r) +{ + state = CBlockSync_State_None; + barrier = NULL; + + cldout(cl, 1) << "C_Block_Sync for " << ino << dendl; + + if (!cl->barriers[ino]) { + cl->barriers[ino] = new BarrierContext(cl, ino); + } + /* XXX current semantics aren't commit-ordered */ + cl->barriers[ino]->write_nobarrier(*this); +} + +void C_Block_Sync::finish(int r) { + cldout(cl, 1) << "C_Block_Sync::finish() for " << ino << " " + << iv << " r==" << r << dendl; + if (rval) + *rval = r; + cl->barriers[ino]->complete(*this); +} + +/* Barrier */ +Barrier::Barrier() +{ } + +Barrier::~Barrier() +{ } + +/* BarrierContext */ +BarrierContext::BarrierContext(Client *c, uint64_t ino) : + cl(c), ino(ino) +{ }; + +void BarrierContext::write_nobarrier(C_Block_Sync &cbs) +{ + std::lock_guard locker(lock); + cbs.state = CBlockSync_State_Unclaimed; + outstanding_writes.push_back(cbs); +} + +void BarrierContext::write_barrier(C_Block_Sync &cbs) +{ + std::unique_lock locker(lock); + barrier_interval &iv = cbs.iv; + + { /* find blocking commit--intrusive no help here */ + BarrierList::iterator iter; + bool done = false; + for (iter = active_commits.begin(); + !done && (iter != active_commits.end()); + ++iter) { + Barrier &barrier = *iter; + while (boost::icl::intersects(barrier.span, iv)) { + /* wait on this */ + barrier.cond.wait(locker); + done = true; + } + } + } + + cbs.state = CBlockSync_State_Unclaimed; + outstanding_writes.push_back(cbs); + +} /* write_barrier */ + +void BarrierContext::commit_barrier(barrier_interval &civ) +{ + std::unique_lock locker(lock); + + /* we commit outstanding writes--if none exist, we don't care */ + if (outstanding_writes.size() == 0) + return; + + boost::icl::interval_set<uint64_t> cvs; + cvs.insert(civ); + + Barrier *barrier = NULL; + BlockSyncList::iterator iter, iter2; + + iter = outstanding_writes.begin(); + while (iter != outstanding_writes.end()) { + barrier_interval &iv = iter->iv; + if (boost::icl::intersects(cvs, iv)) { + C_Block_Sync &a_write = *iter; + if (! barrier) + barrier = new Barrier(); + /* mark the callback */ + a_write.state = CBlockSync_State_Committing; + a_write.barrier = barrier; + iter2 = iter++; + outstanding_writes.erase(iter2); + barrier->write_list.push_back(a_write); + barrier->span.insert(iv); + /* avoid iter invalidate */ + } else { + ++iter; + } + } + + if (barrier) { + active_commits.push_back(*barrier); + /* and wait on this */ + barrier->cond.wait(locker); + } + +} /* commit_barrier */ + +void BarrierContext::complete(C_Block_Sync &cbs) +{ + std::lock_guard locker(lock); + BlockSyncList::iterator iter = + BlockSyncList::s_iterator_to(cbs); + + switch (cbs.state) { + case CBlockSync_State_Unclaimed: + /* cool, no waiting */ + outstanding_writes.erase(iter); + break; + case CBlockSync_State_Committing: + { + Barrier *barrier = iter->barrier; + barrier->write_list.erase(iter); + /* signal waiters */ + barrier->cond.notify_all(); + /* dispose cleared barrier */ + if (barrier->write_list.size() == 0) { + BarrierList::iterator iter2 = + BarrierList::s_iterator_to(*barrier); + active_commits.erase(iter2); + delete barrier; + } + } + break; + default: + ceph_abort(); + break; + } + + cbs.state = CBlockSync_State_Completed; + +} /* complete */ + +BarrierContext::~BarrierContext() +{ } |