summaryrefslogtreecommitdiffstats
path: root/src/libixion/cell_queue_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/libixion/cell_queue_manager.cpp')
-rw-r--r--src/libixion/cell_queue_manager.cpp149
1 files changed, 149 insertions, 0 deletions
diff --git a/src/libixion/cell_queue_manager.cpp b/src/libixion/cell_queue_manager.cpp
new file mode 100644
index 0000000..729c630
--- /dev/null
+++ b/src/libixion/cell_queue_manager.cpp
@@ -0,0 +1,149 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "cell_queue_manager.hpp"
+#include "queue_entry.hpp"
+#include <ixion/cell.hpp>
+#include <ixion/model_context.hpp>
+
+#include <cassert>
+#include <queue>
+#include <future>
+#include <algorithm>
+
+#if !IXION_THREADS
+#error "This file is not to be compiled when the threads are disabled."
+#endif
+
+namespace ixion {
+
+namespace {
+
+class scoped_guard
+{
+ std::thread m_thread;
+public:
+ scoped_guard(std::thread thread) : m_thread(std::move(thread)) {}
+ scoped_guard(scoped_guard&& other) : m_thread(std::move(other.m_thread)) {}
+
+ scoped_guard(const scoped_guard&) = delete;
+ scoped_guard& operator= (const scoped_guard&) = delete;
+
+ ~scoped_guard()
+ {
+ m_thread.join();
+ }
+};
+
+class interpreter_queue
+{
+ using future_type = std::future<void>;
+
+ model_context& m_context;
+
+ std::queue<future_type> m_futures;
+ std::mutex m_mtx;
+ std::condition_variable m_cond;
+
+ size_t m_max_queue;
+
+ void interpret(formula_cell* p, const abs_address_t& pos)
+ {
+ p->interpret(m_context, pos);
+ }
+
+public:
+ interpreter_queue(model_context& cxt, size_t max_queue) :
+ m_context(cxt), m_max_queue(max_queue) {}
+
+ /**
+ * Push one formula cell to the interpreter queue for future
+ * intepretation.
+ *
+ * @param p pointer to formula cell instance.
+ * @param pos position of the formual cell.
+ */
+ void push(formula_cell* p, const abs_address_t& pos)
+ {
+ std::unique_lock<std::mutex> lock(m_mtx);
+
+ while (m_futures.size() >= m_max_queue)
+ m_cond.wait(lock);
+
+ future_type f = std::async(
+ std::launch::async, &interpreter_queue::interpret, this, p, pos);
+ m_futures.push(std::move(f));
+ lock.unlock();
+
+ m_cond.notify_one();
+ }
+
+ /**
+ * Wait for one formula cell to finish its interpretation.
+ */
+ void wait_one()
+ {
+ std::unique_lock<std::mutex> lock(m_mtx);
+
+ while (m_futures.empty())
+ m_cond.wait(lock);
+
+ future_type ret = std::move(m_futures.front());
+ m_futures.pop();
+ lock.unlock();
+
+ ret.get(); // This may throw if an exception was thrown on the thread.
+
+ m_cond.notify_one();
+ }
+};
+
+}
+
+struct formula_cell_queue::impl
+{
+ model_context& m_context;
+ std::vector<queue_entry> m_cells;
+ size_t m_thread_count;
+
+ impl(model_context& cxt, std::vector<queue_entry>&& cells, size_t thread_count) :
+ m_context(cxt),
+ m_cells(cells),
+ m_thread_count(thread_count) {}
+
+ void thread_launch(interpreter_queue* queue)
+ {
+ for (queue_entry& e : m_cells)
+ queue->push(e.p, e.pos);
+ }
+
+ void run()
+ {
+ interpreter_queue queue(m_context, m_thread_count);
+
+ std::thread t(&formula_cell_queue::impl::thread_launch, this, &queue);
+ scoped_guard guard(std::move(t));
+
+ for (size_t i = 0, n = m_cells.size(); i < n; ++i)
+ queue.wait_one();
+ }
+};
+
+formula_cell_queue::formula_cell_queue(
+ model_context& cxt, std::vector<queue_entry>&& cells, size_t thread_count) :
+ mp_impl(std::make_unique<impl>(cxt, std::move(cells), thread_count)) {}
+
+formula_cell_queue::~formula_cell_queue() {}
+
+void formula_cell_queue::run()
+{
+ mp_impl->run();
+}
+
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */