summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Socket.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/crimson/net/Socket.cc81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644
index 00000000..a22e9b2e
--- /dev/null
+++ b/src/crimson/net/Socket.cc
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include "Errors.h"
+
+namespace ceph::net {
+
+namespace {
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+ bufferlist& bl;
+ size_t& remaining;
+
+ bufferlist_consumer(bufferlist& bl, size_t& remaining)
+ : bl(bl), remaining(remaining) {}
+
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+ // consume some or all of a buffer segment
+ seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+ if (remaining >= data.size()) {
+ // consume the whole buffer
+ remaining -= data.size();
+ bl.append(buffer::create_foreign(std::move(data)));
+ if (remaining > 0) {
+ // return none to request more segments
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ } else {
+ // return an empty buffer to singal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type({}));
+ }
+ }
+ if (remaining > 0) {
+ // consume the front
+ bl.append(buffer::create_foreign(data.share(0, remaining)));
+ data.trim_front(remaining);
+ remaining = 0;
+ }
+ // give the rest back to signal that we're done
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type{std::move(data)});
+ };
+};
+
+} // anonymous namespace
+
+seastar::future<bufferlist> Socket::read(size_t bytes)
+{
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining})
+ .then([this] {
+ if (r.remaining) { // throw on short reads
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
+}
+
+seastar::future<seastar::temporary_buffer<char>>
+Socket::read_exactly(size_t bytes) {
+ return in.read_exactly(bytes)
+ .then([this](auto buf) {
+ if (buf.empty()) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ });
+}
+
+} // namespace ceph::net