diff options
Diffstat (limited to '')
-rw-r--r-- | src/crimson/net/Socket.cc | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 00000000..a22e9b2e --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include "Errors.h" + +namespace ceph::net { + +namespace { + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer<char>; + using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future<consumption_result_type> operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future<consumption_result_type>( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +} // anonymous namespace + +seastar::future<bufferlist> Socket::read(size_t bytes) +{ + if (bytes == 0) { + return seastar::make_ready_future<bufferlist>(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}) + .then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); + }); +} + +seastar::future<seastar::temporary_buffer<char>> +Socket::read_exactly(size_t bytes) { + return in.read_exactly(bytes) + .then([this](auto buf) { + if (buf.empty()) { + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<tmp_buf>(std::move(buf)); + }); +} + +} // namespace ceph::net |