summaryrefslogtreecommitdiffstats
path: root/lib/base/io-engine.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/base/io-engine.hpp')
-rw-r--r--lib/base/io-engine.hpp216
1 files changed, 216 insertions, 0 deletions
diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp
new file mode 100644
index 0000000..684d3ac
--- /dev/null
+++ b/lib/base/io-engine.hpp
@@ -0,0 +1,216 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef IO_ENGINE_H
+#define IO_ENGINE_H
+
+#include "base/exception.hpp"
+#include "base/lazy-init.hpp"
+#include "base/logger.hpp"
+#include "base/shared-object.hpp"
+#include <atomic>
+#include <exception>
+#include <memory>
+#include <thread>
+#include <utility>
+#include <vector>
+#include <stdexcept>
+#include <boost/exception/all.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+
+namespace icinga
+{
+
+/**
+ * Scope lock for CPU-bound work done in an I/O thread
+ *
+ * @ingroup base
+ */
+class CpuBoundWork
+{
+public:
+ CpuBoundWork(boost::asio::yield_context yc);
+ CpuBoundWork(const CpuBoundWork&) = delete;
+ CpuBoundWork(CpuBoundWork&&) = delete;
+ CpuBoundWork& operator=(const CpuBoundWork&) = delete;
+ CpuBoundWork& operator=(CpuBoundWork&&) = delete;
+ ~CpuBoundWork();
+
+ void Done();
+
+private:
+ bool m_Done;
+};
+
+/**
+ * Scope break for CPU-bound work done in an I/O thread
+ *
+ * @ingroup base
+ */
+class IoBoundWorkSlot
+{
+public:
+ IoBoundWorkSlot(boost::asio::yield_context yc);
+ IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
+ IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
+ IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
+ IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
+ ~IoBoundWorkSlot();
+
+private:
+ boost::asio::yield_context yc;
+};
+
+/**
+ * Async I/O engine
+ *
+ * @ingroup base
+ */
+class IoEngine
+{
+ friend CpuBoundWork;
+ friend IoBoundWorkSlot;
+
+public:
+ IoEngine(const IoEngine&) = delete;
+ IoEngine(IoEngine&&) = delete;
+ IoEngine& operator=(const IoEngine&) = delete;
+ IoEngine& operator=(IoEngine&&) = delete;
+ ~IoEngine();
+
+ static IoEngine& Get();
+
+ boost::asio::io_context& GetIoContext();
+
+ static inline size_t GetCoroutineStackSize() {
+#ifdef _WIN32
+ // Increase the stack size for Windows coroutines to prevent exception corruption.
+ // Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
+ return 8 * 1024 * 1024;
+#else /* _WIN32 */
+ // Increase the stack size for Linux/Unix coroutines for many JSON objects on the stack.
+ // This may help mitigate possible stack overflows. https://github.com/Icinga/icinga2/issues/7532
+ return 256 * 1024;
+ //return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
+#endif /* _WIN32 */
+ }
+
+ template <typename Handler, typename Function>
+ static void SpawnCoroutine(Handler& h, Function f) {
+
+ boost::asio::spawn(h,
+ [f](boost::asio::yield_context yc) {
+
+ try {
+ f(yc);
+ } catch (const boost::coroutines::detail::forced_unwind &) {
+ // Required for proper stack unwinding when coroutines are destroyed.
+ // https://github.com/boostorg/coroutine/issues/39
+ throw;
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "IoEngine", "Exception in coroutine!");
+ Log(LogDebug, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex);
+ } catch (...) {
+ Log(LogCritical, "IoEngine", "Exception in coroutine!");
+ }
+ },
+ boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
+ );
+ }
+
+ static inline
+ void YieldCurrentCoroutine(boost::asio::yield_context yc)
+ {
+ Get().m_AlreadyExpiredTimer.async_wait(yc);
+ }
+
+private:
+ IoEngine();
+
+ void RunEventLoop();
+
+ static LazyInit<std::unique_ptr<IoEngine>> m_Instance;
+
+ boost::asio::io_context m_IoContext;
+ boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
+ std::vector<std::thread> m_Threads;
+ boost::asio::deadline_timer m_AlreadyExpiredTimer;
+ std::atomic_int_fast32_t m_CpuBoundSemaphore;
+};
+
+class TerminateIoThread : public std::exception
+{
+};
+
+/**
+ * Condition variable which doesn't block I/O threads
+ *
+ * @ingroup base
+ */
+class AsioConditionVariable
+{
+public:
+ AsioConditionVariable(boost::asio::io_context& io, bool init = false);
+
+ void Set();
+ void Clear();
+ void Wait(boost::asio::yield_context yc);
+
+private:
+ boost::asio::deadline_timer m_Timer;
+};
+
+/**
+ * I/O timeout emulator
+ *
+ * @ingroup base
+ */
+class Timeout : public SharedObject
+{
+public:
+ DECLARE_PTR_TYPEDEFS(Timeout);
+
+ template<class Executor, class TimeoutFromNow, class OnTimeout>
+ Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout)
+ : m_Timer(io)
+ {
+ Ptr keepAlive (this);
+
+ m_Cancelled.store(false);
+ m_Timer.expires_from_now(std::move(timeoutFromNow));
+
+ IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) {
+ if (m_Cancelled.load()) {
+ return;
+ }
+
+ {
+ boost::system::error_code ec;
+
+ m_Timer.async_wait(yc[ec]);
+
+ if (ec) {
+ return;
+ }
+ }
+
+ if (m_Cancelled.load()) {
+ return;
+ }
+
+ auto f (onTimeout);
+ f(std::move(yc));
+ });
+ }
+
+ void Cancel();
+
+private:
+ boost::asio::deadline_timer m_Timer;
+ std::atomic<bool> m_Cancelled;
+};
+
+}
+
+#endif /* IO_ENGINE_H */