summaryrefslogtreecommitdiffstats
path: root/src/lib/util/thread_pool.h
blob: fdfce0f71bb3538a1acebfe620548243d79a841d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
// Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <exceptions/exceptions.h>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>

#include <atomic>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <list>
#include <mutex>
#include <queue>
#include <thread>

#include <signal.h>

namespace isc {
namespace util {

/// @brief Defines a thread pool which uses a thread pool queue for managing
/// work items. Each work item is a 'functor' object.
///
/// @tparam WorkItem a functor
/// @tparam Container a 'queue like' container
template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
struct ThreadPool {
    /// @brief Rounding value for 10 packet statistic.
    static const double CEXP10;

    /// @brief Rounding value for 100 packet statistic.
    static const double CEXP100;

    /// @brief Rounding value for 1000 packet statistic.
    static const double CEXP1000;

    /// @brief Type of shared pointers to work items.
    typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;

    /// @brief Constructor
    ThreadPool() {
    }

    /// @brief Destructor
    ~ThreadPool() {
        reset();
    }

    /// @brief reset the thread pool stopping threads and clearing the internal
    /// queue
    ///
    /// It can be called several times even when the thread pool is stopped
    void reset() {
        stopInternal();
        queue_.clear();
    }

    /// @brief start all the threads
    ///
    /// @param thread_count specifies the number of threads to be created and
    /// started
    ///
    /// @throw InvalidOperation if thread pool already started
    /// @throw InvalidParameter if thread count is 0
    void start(uint32_t thread_count) {
        if (!thread_count) {
            isc_throw(InvalidParameter, "thread count is 0");
        }
        if (queue_.enabled()) {
            isc_throw(InvalidOperation, "thread pool already started");
        }
        startInternal(thread_count);
    }

    /// @brief stop all the threads
    ///
    /// @throw InvalidOperation if thread pool already stopped
    void stop() {
        if (!queue_.enabled()) {
            isc_throw(InvalidOperation, "thread pool already stopped");
        }
        stopInternal();
    }

    /// @brief add a work item to the thread pool
    ///
    /// @param item the 'functor' object to be added to the queue
    /// @return false if the queue was full and oldest item(s) was dropped,
    /// true otherwise.
    bool add(const WorkItemPtr& item) {
        return (queue_.pushBack(item));
    }

    /// @brief add a work item to the thread pool at front
    ///
    /// @param item the 'functor' object to be added to the queue
    /// @return false if the queue was full, true otherwise.
    bool addFront(const WorkItemPtr& item) {
        return (queue_.pushFront(item));
    }

    /// @brief count number of work items in the queue
    ///
    /// @return the number of work items in the queue
    size_t count() {
        return (queue_.count());
    }

    /// @brief wait for current items to be processed
    ///
    /// Used to block the calling thread until all items in the queue have
    /// been processed
    void wait() {
        auto id = std::this_thread::get_id();
        if (checkThreadId(id)) {
            isc_throw(MultiThreadingInvalidOperation, "thread pool wait called by worker thread");
        }
        queue_.wait();
    }

    /// @brief wait for items to be processed or return after timeout
    ///
    /// Used to block the calling thread until all items in the queue have
    /// been processed or return after timeout
    ///
    /// @param seconds the time in seconds to wait for tasks to finish
    /// @return true if all tasks finished, false on timeout
    bool wait(uint32_t seconds) {
        auto id = std::this_thread::get_id();
        if (checkThreadId(id)) {
            isc_throw(MultiThreadingInvalidOperation, "thread pool wait with timeout called by worker thread");
        }
        return (queue_.wait(seconds));
    }

    /// @brief set maximum number of work items in the queue
    ///
    /// @param max_queue_size the maximum size (0 means unlimited)
    void setMaxQueueSize(size_t max_queue_size) {
        queue_.setMaxQueueSize(max_queue_size);
    }

    /// @brief get maximum number of work items in the queue
    ///
    /// @return the maximum size (0 means unlimited)
    size_t getMaxQueueSize() {
        return (queue_.getMaxQueueSize());
    }

    /// @brief size number of thread pool threads
    ///
    /// @return the number of threads
    size_t size() {
        return (threads_.size());
    }

    /// @brief get queue length statistic
    ///
    /// @param which select the statistic (10, 100 or 1000)
    /// @return the queue length statistic
    /// @throw InvalidParameter if which is not 10 and 100 and 1000.
    double getQueueStat(size_t which) {
        return (queue_.getQueueStat(which));
    }

private:
    /// @brief start all the threads
    ///
    /// @param thread_count specifies the number of threads to be created and
    /// started
    void startInternal(uint32_t thread_count) {
        // Protect us against signals
        sigset_t sset;
        sigset_t osset;
        sigemptyset(&sset);
        sigaddset(&sset, SIGCHLD);
        sigaddset(&sset, SIGINT);
        sigaddset(&sset, SIGHUP);
        sigaddset(&sset, SIGTERM);
        pthread_sigmask(SIG_BLOCK, &sset, &osset);
        queue_.enable(thread_count);
        try {
            for (uint32_t i = 0; i < thread_count; ++i) {
                threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
            }
        } catch (...) {
            // Restore signal mask.
            pthread_sigmask(SIG_SETMASK, &osset, 0);
            throw;
        }
        // Restore signal mask.
        pthread_sigmask(SIG_SETMASK, &osset, 0);
    }

    /// @brief stop all the threads
    void stopInternal() {
        auto id = std::this_thread::get_id();
        if (checkThreadId(id)) {
            isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by worker thread");
        }
        queue_.disable();
        for (auto thread : threads_) {
            thread->join();
        }
        threads_.clear();
    }

    /// @brief check specified thread id against own threads
    ///
    /// @return true if thread is owned, false otherwise
    bool checkThreadId(std::thread::id id) {
        for (auto thread : threads_) {
            if (id == thread->get_id()) {
                return (true);
            }
        }
        return (false);
    }

    /// @brief Defines a generic thread pool queue.
    ///
    /// The main purpose is to safely manage thread pool tasks.
    /// The thread pool queue can be 'disabled', which means that no items can be
    /// removed from the queue, or 'enabled', which guarantees that inserting or
    /// removing items are thread safe.
    /// In 'disabled' state, all threads waiting on the queue are unlocked and all
    /// operations are non blocking.
    ///
    /// @tparam Item a 'smart pointer' to a functor
    /// @tparam QueueContainer a 'queue like' container
    template <typename Item, typename QueueContainer = std::queue<Item>>
    struct ThreadPoolQueue {
        /// @brief Constructor
        ///
        /// Creates the thread pool queue in 'disabled' state
        ThreadPoolQueue()
            : enabled_(false), max_queue_size_(0), working_(0),
              stat10(0.), stat100(0.), stat1000(0.) {
        }

        /// @brief Destructor
        ///
        /// Destroys the thread pool queue
        ~ThreadPoolQueue() {
            disable();
            clear();
        }

        /// @brief set maximum number of work items in the queue
        ///
        /// @return the maximum size (0 means unlimited)
        void setMaxQueueSize(size_t max_queue_size) {
            std::lock_guard<std::mutex> lock(mutex_);
            max_queue_size_ = max_queue_size;
        }

        /// @brief get maximum number of work items in the queue
        ///
        /// @return the maximum size (0 means unlimited)
        size_t getMaxQueueSize() {
            std::lock_guard<std::mutex> lock(mutex_);
            return (max_queue_size_);
        }

        /// @brief push work item to the queue
        ///
        /// Used to add work items to the queue.
        /// When the queue is full oldest items are removed and false is
        /// returned.
        /// This function adds an item to the queue and wakes up at least one
        /// thread waiting on the queue.
        ///
        /// @param item the new item to be added to the queue
        /// @return false if the queue was full and oldest item(s) dropped,
        /// true otherwise
        bool pushBack(const Item& item) {
            bool ret = true;
            if (!item) {
                return (ret);
            }
            {
                std::lock_guard<std::mutex> lock(mutex_);
                if (max_queue_size_ != 0) {
                    while (queue_.size() >= max_queue_size_) {
                        queue_.pop_front();
                        ret = false;
                    }
                }
                queue_.push_back(item);
            }
            // Notify pop function so that it can effectively remove a work item.
            cv_.notify_one();
            return (ret);
        }

        /// @brief push work item to the queue at front.
        ///
        /// Used to add work items to the queue at front.
        /// When the queue is full the item is not added.
        ///
        /// @param item the new item to be added to the queue
        /// @return false if the queue was full, true otherwise
        bool pushFront(const Item& item) {
            if (!item) {
                return (true);
            }
            {
                std::lock_guard<std::mutex> lock(mutex_);
                if ((max_queue_size_ != 0) &&
                    (queue_.size() >= max_queue_size_)) {
                    return (false);
                }
                queue_.push_front(item);
            }
            // Notify pop function so that it can effectively remove a work item.
            cv_.notify_one();
            return (true);
        }

        /// @brief pop work item from the queue or block waiting
        ///
        /// Used to retrieve and remove a work item from the queue
        /// If the queue is 'disabled', this function returns immediately an empty
        /// element.
        /// If the queue is 'enabled', this function returns the first element in
        /// the queue or blocks the calling thread if there are no work items
        /// available.
        /// Before a work item is returned statistics are updated.
        ///
        /// @return the first work item from the queue or an empty element.
        Item pop() {
            std::unique_lock<std::mutex> lock(mutex_);
            --working_;
            // Wait for push or disable functions.
            if (working_ == 0 && queue_.empty()) {
                wait_cv_.notify_all();
            }
            cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
            if (!enabled_) {
                return (Item());
            }
            ++working_;
            size_t length = queue_.size();
            stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
            stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
            stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
            Item item = queue_.front();
            queue_.pop_front();
            return (item);
        }

        /// @brief count number of work items in the queue
        ///
        /// Returns the number of work items in the queue
        ///
        /// @return the number of work items
        size_t count() {
            std::lock_guard<std::mutex> lock(mutex_);
            return (queue_.size());
        }

        /// @brief wait for current items to be processed
        ///
        /// Used to block the calling thread until all items in the queue have
        /// been processed
        void wait() {
            std::unique_lock<std::mutex> lock(mutex_);
            // Wait for any item or for working threads to finish.
            wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
        }

        /// @brief wait for items to be processed or return after timeout
        ///
        /// Used to block the calling thread until all items in the queue have
        /// been processed or return after timeout
        ///
        /// @param seconds the time in seconds to wait for tasks to finish
        /// @return true if all tasks finished, false on timeout
        bool wait(uint32_t seconds) {
            std::unique_lock<std::mutex> lock(mutex_);
            // Wait for any item or for working threads to finish.
            bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
                                         [&]() {return (working_ == 0 && queue_.empty());});
            return (ret);
        }

        /// @brief get queue length statistic
        ///
        /// @param which select the statistic (10, 100 or 1000)
        /// @return the queue length statistic
        /// @throw InvalidParameter if which is not 10 and 100 and 1000.
        double getQueueStat(size_t which) {
            std::lock_guard<std::mutex> lock(mutex_);
            switch (which) {
            case 10:
                return (stat10);
            case 100:
                return (stat100);
            case 1000:
                return (stat1000);
            default:
                isc_throw(InvalidParameter, "supported statistic for "
                          << "10/100/1000 only, not " << which);
            }
        }

        /// @brief clear remove all work items
        ///
        /// Removes all queued work items
        void clear() {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_ = QueueContainer();
            working_ = 0;
            wait_cv_.notify_all();
        }

        /// @brief enable the queue
        ///
        /// Sets the queue state to 'enabled'
        ///
        /// @param number of working threads
        void enable(uint32_t thread_count) {
            std::lock_guard<std::mutex> lock(mutex_);
            enabled_ = true;
            working_ = thread_count;
        }

        /// @brief disable the queue
        ///
        /// Sets the queue state to 'disabled'
        void disable() {
            {
                std::lock_guard<std::mutex> lock(mutex_);
                enabled_ = false;
            }
            // Notify pop so that it can exit.
            cv_.notify_all();
        }

        /// @brief return the state of the queue
        ///
        /// Returns the state of the queue
        ///
        /// @return the state
        bool enabled() {
            return (enabled_);
        }

    private:
        /// @brief underlying queue container
        QueueContainer queue_;

        /// @brief mutex used for critical sections
        std::mutex mutex_;

        /// @brief condition variable used to signal waiting threads
        std::condition_variable cv_;

        /// @brief condition variable used to wait for all items to be processed
        std::condition_variable wait_cv_;

        /// @brief the sate of the queue
        /// The 'enabled' state corresponds to true value
        /// The 'disabled' state corresponds to false value
        std::atomic<bool> enabled_;

        /// @brief maximum number of work items in the queue
        /// (0 means unlimited)
        size_t max_queue_size_;

        /// @brief number of threads currently doing work
        uint32_t working_;

        /// @brief queue length statistic for 10 packets
        double stat10;

        /// @brief queue length statistic for 100 packets
        double stat100;

        /// @brief queue length statistic for 1000 packets
        double stat1000;
    };

    /// @brief run function of each thread
    void run() {
        while (queue_.enabled()) {
            WorkItemPtr item = queue_.pop();
            if (item) {
                try {
                    (*item)();
                } catch (...) {
                    // catch all exceptions
                }
            }
        }
    }

    /// @brief list of worker threads
    std::vector<boost::shared_ptr<std::thread>> threads_;

    /// @brief underlying work items queue
    ThreadPoolQueue<WorkItemPtr, Container> queue_;
};

/// Initialize the 10 packet rounding to exp(-.1)
template <typename W, typename C>
const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);

/// Initialize the 100 packet rounding to exp(-.01)
template <typename W, typename C>
const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);

/// Initialize the 1000 packet rounding to exp(-.001)
template <typename W, typename C>
const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);

}  // namespace util
}  // namespace isc

#endif  // THREAD_POOL_H