summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Socket.h
blob: c1a2ed59a4ce34d103c36aff7799f5fe1790c673 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <seastar/core/reactor.hh>
#include <seastar/net/packet.hh>

#include "include/buffer.h"

namespace ceph::net {

class Socket
{
  const seastar::shard_id sid;
  seastar::connected_socket socket;
  seastar::input_stream<char> in;
  seastar::output_stream<char> out;

  /// buffer state for read()
  struct {
    bufferlist buffer;
    size_t remaining;
  } r;

 public:
  explicit Socket(seastar::connected_socket&& _socket)
    : sid{seastar::engine().cpu_id()},
      socket(std::move(_socket)),
      in(socket.input()),
      out(socket.output()) {}
  Socket(Socket&& o) = delete;

  /// read the requested number of bytes into a bufferlist
  seastar::future<bufferlist> read(size_t bytes);
  using tmp_buf = seastar::temporary_buffer<char>;
  using packet = seastar::net::packet;
  seastar::future<tmp_buf> read_exactly(size_t bytes);

  seastar::future<> write(packet&& buf) {
    return out.write(std::move(buf));
  }
  seastar::future<> flush() {
    return out.flush();
  }
  seastar::future<> write_flush(packet&& buf) {
    return out.write(std::move(buf)).then([this] { return out.flush(); });
  }

  /// Socket can only be closed once.
  seastar::future<> close() {
    return seastar::smp::submit_to(sid, [this] {
        return seastar::when_all(
          in.close(), out.close()).discard_result();
      });
  }
};

} // namespace ceph::net