summaryrefslogtreecommitdiffstats
path: root/ml/Queue.h
blob: 37a74bd077de60c8616c5ccf71d460534f649c41 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#ifndef QUEUE_H
#define QUEUE_H

#include "ml-private.h"
#include "Mutex.h"
#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class Queue {
public:
    Queue(void) : Q(), M() {
        pthread_cond_init(&CV, nullptr);
        Exit = false;
    }

    ~Queue() {
        pthread_cond_destroy(&CV);
    }

    void push(T t) {
        std::lock_guard<Mutex> L(M);

        Q.push(t);
        pthread_cond_signal(&CV);
    }

    std::pair<T, size_t> pop(void) {
        std::lock_guard<Mutex> L(M);

        while (Q.empty()) {
            pthread_cond_wait(&CV, M.inner());

            if (Exit) {
                // This should happen only when we are destroying a host.
                // Callers should use a flag dedicated to checking if we
                // are about to delete the host or exit the agent. The original
                // implementation would call pthread_exit which would cause
                // the queue's mutex to be destroyed twice (and fail on the
                // 2nd time)
                return { T(), 0 };
            }
        }

        T V = Q.front();
        size_t Size = Q.size();
        Q.pop();

        return { V, Size };
    }

    void signal() {
        std::lock_guard<Mutex> L(M);
        Exit = true;
        pthread_cond_signal(&CV);
    }

private:
    std::queue<T> Q;
    Mutex M;
    pthread_cond_t CV;
    std::atomic<bool> Exit;
};

#endif /* QUEUE_H */