summaryrefslogtreecommitdiffstats
path: root/lib/base/stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/base/stream.cpp')
-rw-r--r--lib/base/stream.cpp149
1 files changed, 149 insertions, 0 deletions
diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp
new file mode 100644
index 0000000..e558385
--- /dev/null
+++ b/lib/base/stream.cpp
@@ -0,0 +1,149 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/stream.hpp"
+#include <boost/algorithm/string/trim.hpp>
+#include <chrono>
+
+using namespace icinga;
+
+void Stream::RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler)
+{
+ if (SupportsWaiting())
+ OnDataAvailable.connect(handler);
+ else
+ BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
+}
+
+bool Stream::SupportsWaiting() const
+{
+ return false;
+}
+
+bool Stream::IsDataAvailable() const
+{
+ return false;
+}
+
+void Stream::Shutdown()
+{
+ BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
+}
+
+size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
+{
+ BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Peek()."));
+}
+
+void Stream::SignalDataAvailable()
+{
+ OnDataAvailable(this);
+
+ {
+ std::unique_lock<std::mutex> lock(m_Mutex);
+ m_CV.notify_all();
+ }
+}
+
+bool Stream::WaitForData()
+{
+ if (!SupportsWaiting())
+ BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
+
+ std::unique_lock<std::mutex> lock(m_Mutex);
+
+ while (!IsDataAvailable() && !IsEof())
+ m_CV.wait(lock);
+
+ return IsDataAvailable() || IsEof();
+}
+
+bool Stream::WaitForData(int timeout)
+{
+ namespace ch = std::chrono;
+
+ if (!SupportsWaiting())
+ BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
+
+ if (timeout < 0)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Timeout can't be negative"));
+
+ std::unique_lock<std::mutex> lock(m_Mutex);
+
+ return m_CV.wait_for(lock, ch::duration<int>(timeout), [this]() { return IsDataAvailable() || IsEof(); });
+}
+
+void Stream::Close()
+{
+ OnDataAvailable.disconnect_all_slots();
+
+ /* Force signals2 to remove the slots, see https://stackoverflow.com/questions/2049291/force-deletion-of-slot-in-boostsignals2
+ * for details. */
+ OnDataAvailable.connect([](const Stream::Ptr&) { });
+}
+
+StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
+{
+ if (context.Eof)
+ return StatusEof;
+
+ if (context.MustRead) {
+ if (!context.FillFromStream(this, may_wait)) {
+ context.Eof = true;
+
+ *line = String(context.Buffer, &(context.Buffer[context.Size]));
+ boost::algorithm::trim_right(*line);
+
+ return StatusNewItem;
+ }
+ }
+
+ for (size_t i = 0; i < context.Size; i++) {
+ if (context.Buffer[i] == '\n') {
+ *line = String(context.Buffer, context.Buffer + i);
+ boost::algorithm::trim_right(*line);
+
+ context.DropData(i + 1u);
+
+ context.MustRead = !context.Size;
+ return StatusNewItem;
+ }
+ }
+
+ context.MustRead = true;
+ return StatusNeedData;
+}
+
+bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
+{
+ if (may_wait && stream->SupportsWaiting())
+ stream->WaitForData();
+
+ size_t count = 0;
+
+ do {
+ Buffer = (char *)realloc(Buffer, Size + 4096);
+
+ if (!Buffer)
+ throw std::bad_alloc();
+
+ if (stream->IsEof())
+ break;
+
+ size_t rc = stream->Read(Buffer + Size, 4096, true);
+
+ Size += rc;
+ count += rc;
+ } while (count < 64 * 1024 && stream->IsDataAvailable());
+
+ if (count == 0 && stream->IsEof())
+ return false;
+ else
+ return true;
+}
+
+void StreamReadContext::DropData(size_t count)
+{
+ ASSERT(count <= Size);
+ memmove(Buffer, Buffer + count, Size - count);
+ Size -= count;
+}