1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
|
// Copyright (C) 2017-2020 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
#include <asiolink/asio_wrapper.h>
#include <asiolink/interval_timer.h>
#include <asiolink/unix_domain_socket.h>
#include <cc/json_feed.h>
#include <config/client_connection.h>
#include <boost/enable_shared_from_this.hpp>
#include <array>
#include <functional>
using namespace isc::asiolink;
namespace isc {
namespace config {
/// @brief Implementation of the @ref ClientConnection.
class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
public:
/// @brief Constructor.
///
/// @param io_service Reference to the IO service.
explicit ClientConnectionImpl(IOService& io_service);
/// @brief This method schedules timer or reschedules existing timer.
///
/// @param handler Pointer to the user supplied callback function which
/// should be invoked when transaction completes or when an error has
/// occurred during the transaction.
void scheduleTimer(ClientConnection::Handler handler);
/// @brief Starts asynchronous transaction with a remote endpoint.
///
/// See @ref ClientConnection::start documentation for the details.
///
/// @param socket_path Path to the socket description that the server
/// is bound to.
/// @param command Control command to be sent to the server.
/// @param handler Pointer to the user supplied callback function which
/// should be invoked when transaction completes or when an error has
/// occurred during the transaction.
/// @param timeout Connection timeout in milliseconds.
void start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout);
/// @brief Closes the socket.
void stop();
/// @brief Starts asynchronous send.
///
/// This method may be called multiple times internally when the command
/// is large and can't be sent all at once.
///
/// @param buffer Pointer to the buffer holding input data.
/// @param length Length of the data in the input buffer.
/// @param handler User supplied callback invoked after the chunk of data
/// has been sent.
void doSend(const void* buffer, const size_t length,
ClientConnection::Handler handler);
/// @brief Starts asynchronous receive from the server.
///
/// This method may be called multiple times internally if the response
/// is large. The @ref JSONFeed instance is used to detect the boundaries
/// of the command within the stream. Once the entire command has been
/// received the user callback is invoked and the instance of the
/// @ref JSONFeed is returned.
///
/// @param handler User supplied callback.
void doReceive(ClientConnection::Handler handler);
/// @brief Terminates the connection and invokes a user callback indicating
/// an error.
///
/// @param ec Error code.
/// @param handler User callback.
void terminate(const boost::system::error_code& ec,
ClientConnection::Handler handler);
/// @brief Callback invoked when the timeout occurs.
///
/// It calls @ref terminate with the @c boost::asio::error::timed_out.
void timeoutCallback(ClientConnection::Handler handler);
private:
/// @brief Unix domain socket used for communication with a server.
UnixDomainSocket socket_;
/// @brief Pointer to the @ref JSONFeed holding a response.
///
///It may be a null pointer until some part of a response has been received.
JSONFeedPtr feed_;
/// @brief Holds the entire command being transmitted over the unix
/// socket.
std::string current_command_;
/// @brief Buffer into which chunks of the response are received.
std::array<char, 32768> read_buf_;
/// @brief Instance of the interval timer protecting against timeouts.
IntervalTimer timer_;
/// @brief Timeout value used for the timer.
long timeout_;
};
ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
: socket_(io_service), feed_(), current_command_(), timer_(io_service),
timeout_(0) {
}
void
ClientConnectionImpl::scheduleTimer(ClientConnection::Handler handler) {
if (timeout_ > 0) {
timer_.setup(std::bind(&ClientConnectionImpl::timeoutCallback,
this, handler),
timeout_, IntervalTimer::ONE_SHOT);
}
}
void
ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout) {
// Start the timer protecting against timeouts.
timeout_ = timeout.timeout_;
scheduleTimer(handler);
// Store the command in the class member to make sure it is valid
// the entire time.
current_command_.assign(command.control_command_);
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async connect.
auto self(shared_from_this());
// Start asynchronous connect. This will return immediately.
socket_.asyncConnect(socket_path.socket_path_,
[this, self, command, handler](const boost::system::error_code& ec) {
// We failed to connect so we can't proceed. Simply clean up
// and invoke the user callback to signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// Connection successful. Transmit the command to the remote
// endpoint asynchronously.
doSend(current_command_.c_str(), current_command_.length(),
handler);
}
});
}
void
ClientConnectionImpl::doSend(const void* buffer, const size_t length,
ClientConnection::Handler handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async send.
auto self(shared_from_this());
// Start asynchronous transmission of the command. This will return
// immediately.
socket_.asyncSend(buffer, length,
[this, self, buffer, length, handler]
(const boost::system::error_code& ec, size_t bytes_transferred) {
// An error has occurred while sending. Close the connection and
// signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// Sending is in progress, so push back the timeout.
scheduleTimer(handler);
// If the number of bytes we have managed to send so far is
// lower than the amount of data we're trying to send, we
// have to schedule another send to deliver the rest of
// the data.
if (bytes_transferred < length) {
doSend(static_cast<const char*>(buffer) + bytes_transferred,
length - bytes_transferred, handler);
} else {
// We have sent all the data. Start receiving a response.
doReceive(handler);
}
}
});
}
void
ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async receive.
auto self(shared_from_this());
socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
[this, self, handler]
(const boost::system::error_code& ec, size_t length) {
// An error has occurred while receiving the data. Close the connection
// and signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// Receiving is in progress, so push back the timeout.
scheduleTimer(handler);
std::string x(&read_buf_[0], length);
// Lazy initialization of the JSONFeed. The feed will be "parsing"
// received JSON stream and will detect when the whole response
// has been received.
if (!feed_) {
feed_.reset(new JSONFeed());
feed_->initModel();
}
// Put everything we have received so far into the feed and process
// the data.
feed_->postBuffer(&read_buf_[0], length);
feed_->poll();
// If the feed indicates that only a part of the response has been
// received, schedule another receive to get more data.
if (feed_->needData()) {
doReceive(handler);
} else {
// We have received the entire response, let's call the handler
// and indicate success.
terminate(ec, handler);
}
}
});
}
void
ClientConnectionImpl::terminate(const boost::system::error_code& ec,
ClientConnection::Handler handler) {
try {
timer_.cancel();
socket_.close();
current_command_.clear();
handler(ec, feed_);
} catch (...) {
// None of these operations should throw. In particular, the handler
// should not throw but if it has been misimplemented, we want to make
// sure we don't emit any exceptions from here.
}
}
void
ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
// Timeout has occurred. The remote server didn't provide the entire
// response within the given time frame. Let's close the connection
// and signal the timeout.
terminate(boost::asio::error::timed_out, handler);
}
ClientConnection::ClientConnection(asiolink::IOService& io_service)
: impl_(new ClientConnectionImpl(io_service)) {
}
void
ClientConnection::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout) {
impl_->start(socket_path, command, handler, timeout);
}
} // end of namespace config
} // end of namespace isc
|