summaryrefslogtreecommitdiffstats
path: root/src/libixion/cell_queue_manager.cpp
blob: 729c630f56baa0a84d30238e003c3caef076cbdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#include "cell_queue_manager.hpp"
#include "queue_entry.hpp"
#include <ixion/cell.hpp>
#include <ixion/model_context.hpp>

#include <cassert>
#include <queue>
#include <future>
#include <algorithm>

#if !IXION_THREADS
#error "This file is not to be compiled when the threads are disabled."
#endif

namespace ixion {

namespace {

class scoped_guard
{
    std::thread m_thread;
public:
    scoped_guard(std::thread thread) : m_thread(std::move(thread)) {}
    scoped_guard(scoped_guard&& other) : m_thread(std::move(other.m_thread)) {}

    scoped_guard(const scoped_guard&) = delete;
    scoped_guard& operator= (const scoped_guard&) = delete;

    ~scoped_guard()
    {
        m_thread.join();
    }
};

class interpreter_queue
{
    using future_type = std::future<void>;

    model_context& m_context;

    std::queue<future_type> m_futures;
    std::mutex m_mtx;
    std::condition_variable m_cond;

    size_t m_max_queue;

    void interpret(formula_cell* p, const abs_address_t& pos)
    {
        p->interpret(m_context, pos);
    }

public:
    interpreter_queue(model_context& cxt, size_t max_queue) :
        m_context(cxt), m_max_queue(max_queue) {}

    /**
     * Push one formula cell to the interpreter queue for future
     * intepretation.
     *
     * @param p pointer to formula cell instance.
     * @param pos position of the formual cell.
     */
    void push(formula_cell* p, const abs_address_t& pos)
    {
        std::unique_lock<std::mutex> lock(m_mtx);

        while (m_futures.size() >= m_max_queue)
            m_cond.wait(lock);

        future_type f = std::async(
            std::launch::async, &interpreter_queue::interpret, this, p, pos);
        m_futures.push(std::move(f));
        lock.unlock();

        m_cond.notify_one();
    }

    /**
     * Wait for one formula cell to finish its interpretation.
     */
    void wait_one()
    {
        std::unique_lock<std::mutex> lock(m_mtx);

        while (m_futures.empty())
            m_cond.wait(lock);

        future_type ret = std::move(m_futures.front());
        m_futures.pop();
        lock.unlock();

        ret.get();  // This may throw if an exception was thrown on the thread.

        m_cond.notify_one();
    }
};

}

struct formula_cell_queue::impl
{
    model_context& m_context;
    std::vector<queue_entry> m_cells;
    size_t m_thread_count;

    impl(model_context& cxt, std::vector<queue_entry>&& cells, size_t thread_count) :
        m_context(cxt),
        m_cells(cells),
        m_thread_count(thread_count) {}

    void thread_launch(interpreter_queue* queue)
    {
        for (queue_entry& e : m_cells)
            queue->push(e.p, e.pos);
    }

    void run()
    {
        interpreter_queue queue(m_context, m_thread_count);

        std::thread t(&formula_cell_queue::impl::thread_launch, this, &queue);
        scoped_guard guard(std::move(t));

        for (size_t i = 0, n = m_cells.size(); i < n; ++i)
            queue.wait_one();
    }
};

formula_cell_queue::formula_cell_queue(
    model_context& cxt, std::vector<queue_entry>&& cells, size_t thread_count) :
    mp_impl(std::make_unique<impl>(cxt, std::move(cells), thread_count)) {}

formula_cell_queue::~formula_cell_queue() {}

void formula_cell_queue::run()
{
    mp_impl->run();
}

}

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */