summaryrefslogtreecommitdiffstats
path: root/lib/base/stream.hpp
blob: 6bc8fed72c95b6c8819f900af9dc3219413cb4ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */

#ifndef STREAM_H
#define STREAM_H

#include "base/i2-base.hpp"
#include "base/object.hpp"
#include <boost/signals2.hpp>
#include <condition_variable>
#include <mutex>

namespace icinga
{

class String;
class Stream;

enum ConnectionRole
{
	RoleClient,
	RoleServer
};

struct StreamReadContext
{
	~StreamReadContext()
	{
		free(Buffer);
	}

	bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
	void DropData(size_t count);

	char *Buffer{nullptr};
	size_t Size{0};
	bool MustRead{true};
	bool Eof{false};
};

enum StreamReadStatus
{
	StatusNewItem,
	StatusNeedData,
	StatusEof
};

/**
 * A stream.
 *
 * @ingroup base
 */
class Stream : public Object
{
public:
	DECLARE_PTR_TYPEDEFS(Stream);

	/**
	 * Reads data from the stream without removing it from the stream buffer.
	 *
	 * @param buffer The buffer where data should be stored. May be nullptr if you're
	 *		 not actually interested in the data.
	 * @param count The number of bytes to read from the queue.
	 * @param allow_partial Whether to allow partial reads.
	 * @returns The number of bytes actually read.
	 */
	virtual size_t Peek(void *buffer, size_t count, bool allow_partial = false);

	/**
	 * Reads data from the stream.
	 *
	 * @param buffer The buffer where data should be stored. May be nullptr if you're
	 *		 not actually interested in the data.
	 * @param count The number of bytes to read from the queue.
	 * @param allow_partial Whether to allow partial reads.
	 * @returns The number of bytes actually read.
	 */
	virtual size_t Read(void *buffer, size_t count, bool allow_partial = false) = 0;

	/**
	 * Writes data to the stream.
	 *
	 * @param buffer The data that is to be written.
	 * @param count The number of bytes to write.
	 * @returns The number of bytes written
	 */
	virtual void Write(const void *buffer, size_t count) = 0;

	/**
	 * Causes the stream to be closed (via Close()) once all pending data has been
	 * written.
	 */
	virtual void Shutdown();

	/**
	 * Closes the stream and releases resources.
	 */
	virtual void Close();

	/**
	 * Checks whether we've reached the end-of-file condition.
	 *
	 * @returns true if EOF.
	 */
	virtual bool IsEof() const = 0;

	/**
	 * Waits until data can be read from the stream.
	 * Optionally with a timeout.
	 */
	bool WaitForData();
	bool WaitForData(int timeout);

	virtual bool SupportsWaiting() const;

	virtual bool IsDataAvailable() const;

	void RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler);

	StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);

protected:
	void SignalDataAvailable();

private:
	boost::signals2::signal<void(const Stream::Ptr&)> OnDataAvailable;

	std::mutex m_Mutex;
	std::condition_variable m_CV;
};

}

#endif /* STREAM_H */