summaryrefslogtreecommitdiffstats
path: root/src/client/barrier.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/barrier.cc')
-rw-r--r--src/client/barrier.cc196
1 files changed, 196 insertions, 0 deletions
diff --git a/src/client/barrier.cc b/src/client/barrier.cc
new file mode 100644
index 000000000..7a0c08868
--- /dev/null
+++ b/src/client/barrier.cc
@@ -0,0 +1,196 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ *
+ * Copyright (C) 2012 CohortFS, LLC.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#if defined(__FreeBSD__)
+#include <sys/param.h>
+#endif
+
+#include "include/Context.h"
+#include "Client.h"
+#include "barrier.h"
+#include "include/ceph_assert.h"
+
+#undef dout_prefix
+#define dout_prefix *_dout << "client." << whoami << " "
+
+#define dout_subsys ceph_subsys_client
+
+#define cldout(cl, v) dout_impl((cl)->cct, dout_subsys, v) \
+ *_dout << "client." << cl->whoami << " "
+
+/* C_Block_Sync */
+class C_Block_Sync : public Context {
+private:
+ Client *cl;
+ uint64_t ino;
+ barrier_interval iv;
+ enum CBlockSync_State state;
+ Barrier *barrier;
+ int *rval; /* see Cond.h */
+
+public:
+ boost::intrusive::list_member_hook<> intervals_hook;
+ C_Block_Sync(Client *c, uint64_t i, barrier_interval iv, int *r);
+ void finish(int rval);
+
+ friend class Barrier;
+ friend class BarrierContext;
+};
+
+C_Block_Sync::C_Block_Sync(Client *c, uint64_t i, barrier_interval iv,
+ int *r=0) :
+ cl(c), ino(i), iv(iv), rval(r)
+{
+ state = CBlockSync_State_None;
+ barrier = NULL;
+
+ cldout(cl, 1) << "C_Block_Sync for " << ino << dendl;
+
+ if (!cl->barriers[ino]) {
+ cl->barriers[ino] = new BarrierContext(cl, ino);
+ }
+ /* XXX current semantics aren't commit-ordered */
+ cl->barriers[ino]->write_nobarrier(*this);
+}
+
+void C_Block_Sync::finish(int r) {
+ cldout(cl, 1) << "C_Block_Sync::finish() for " << ino << " "
+ << iv << " r==" << r << dendl;
+ if (rval)
+ *rval = r;
+ cl->barriers[ino]->complete(*this);
+}
+
+/* Barrier */
+Barrier::Barrier()
+{ }
+
+Barrier::~Barrier()
+{ }
+
+/* BarrierContext */
+BarrierContext::BarrierContext(Client *c, uint64_t ino) :
+ cl(c), ino(ino)
+{ };
+
+void BarrierContext::write_nobarrier(C_Block_Sync &cbs)
+{
+ std::lock_guard locker(lock);
+ cbs.state = CBlockSync_State_Unclaimed;
+ outstanding_writes.push_back(cbs);
+}
+
+void BarrierContext::write_barrier(C_Block_Sync &cbs)
+{
+ std::unique_lock locker(lock);
+ barrier_interval &iv = cbs.iv;
+
+ { /* find blocking commit--intrusive no help here */
+ BarrierList::iterator iter;
+ bool done = false;
+ for (iter = active_commits.begin();
+ !done && (iter != active_commits.end());
+ ++iter) {
+ Barrier &barrier = *iter;
+ while (boost::icl::intersects(barrier.span, iv)) {
+ /* wait on this */
+ barrier.cond.wait(locker);
+ done = true;
+ }
+ }
+ }
+
+ cbs.state = CBlockSync_State_Unclaimed;
+ outstanding_writes.push_back(cbs);
+
+} /* write_barrier */
+
+void BarrierContext::commit_barrier(barrier_interval &civ)
+{
+ std::unique_lock locker(lock);
+
+ /* we commit outstanding writes--if none exist, we don't care */
+ if (outstanding_writes.size() == 0)
+ return;
+
+ boost::icl::interval_set<uint64_t> cvs;
+ cvs.insert(civ);
+
+ Barrier *barrier = NULL;
+ BlockSyncList::iterator iter, iter2;
+
+ iter = outstanding_writes.begin();
+ while (iter != outstanding_writes.end()) {
+ barrier_interval &iv = iter->iv;
+ if (boost::icl::intersects(cvs, iv)) {
+ C_Block_Sync &a_write = *iter;
+ if (! barrier)
+ barrier = new Barrier();
+ /* mark the callback */
+ a_write.state = CBlockSync_State_Committing;
+ a_write.barrier = barrier;
+ iter2 = iter++;
+ outstanding_writes.erase(iter2);
+ barrier->write_list.push_back(a_write);
+ barrier->span.insert(iv);
+ /* avoid iter invalidate */
+ } else {
+ ++iter;
+ }
+ }
+
+ if (barrier) {
+ active_commits.push_back(*barrier);
+ /* and wait on this */
+ barrier->cond.wait(locker);
+ }
+
+} /* commit_barrier */
+
+void BarrierContext::complete(C_Block_Sync &cbs)
+{
+ std::lock_guard locker(lock);
+ BlockSyncList::iterator iter =
+ BlockSyncList::s_iterator_to(cbs);
+
+ switch (cbs.state) {
+ case CBlockSync_State_Unclaimed:
+ /* cool, no waiting */
+ outstanding_writes.erase(iter);
+ break;
+ case CBlockSync_State_Committing:
+ {
+ Barrier *barrier = iter->barrier;
+ barrier->write_list.erase(iter);
+ /* signal waiters */
+ barrier->cond.notify_all();
+ /* dispose cleared barrier */
+ if (barrier->write_list.size() == 0) {
+ BarrierList::iterator iter2 =
+ BarrierList::s_iterator_to(*barrier);
+ active_commits.erase(iter2);
+ delete barrier;
+ }
+ }
+ break;
+ default:
+ ceph_abort();
+ break;
+ }
+
+ cbs.state = CBlockSync_State_Completed;
+
+} /* complete */
+
+BarrierContext::~BarrierContext()
+{ }