summaryrefslogtreecommitdiffstats
path: root/src/msg/simple/Pipe.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/simple/Pipe.h
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/msg/simple/Pipe.h315
1 files changed, 315 insertions, 0 deletions
diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h
new file mode 100644
index 00000000..81245198
--- /dev/null
+++ b/src/msg/simple/Pipe.h
@@ -0,0 +1,315 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSGR_PIPE_H
+#define CEPH_MSGR_PIPE_H
+
+#include "auth/AuthSessionHandler.h"
+
+#include "msg/msg_types.h"
+#include "msg/Messenger.h"
+#include "PipeConnection.h"
+
+
+class SimpleMessenger;
+class DispatchQueue;
+
+static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
+
+ /**
+ * The Pipe is the most complex SimpleMessenger component. It gets
+ * two threads, one each for reading and writing on a socket it's handed
+ * at creation time, and is responsible for everything that happens on
+ * that socket. Besides message transmission, it's responsible for
+ * propagating socket errors to the SimpleMessenger and then sticking
+ * around in a state where it can provide enough data for the SimpleMessenger
+ * to provide reliable Message delivery when it manages to reconnect.
+ */
+ class Pipe : public RefCountedObject {
+ /**
+ * The Reader thread handles all reads off the socket -- not just
+ * Messages, but also acks and other protocol bits (excepting startup,
+ * when the Writer does a couple of reads).
+ * All the work is implemented in Pipe itself, of course.
+ */
+ class Reader : public Thread {
+ Pipe *pipe;
+ public:
+ explicit Reader(Pipe *p) : pipe(p) {}
+ void *entry() override { pipe->reader(); return 0; }
+ } reader_thread;
+
+ /**
+ * The Writer thread handles all writes to the socket (after startup).
+ * All the work is implemented in Pipe itself, of course.
+ */
+ class Writer : public Thread {
+ Pipe *pipe;
+ public:
+ explicit Writer(Pipe *p) : pipe(p) {}
+ void *entry() override { pipe->writer(); return 0; }
+ } writer_thread;
+
+ class DelayedDelivery;
+ DelayedDelivery *delay_thread;
+ public:
+ Pipe(SimpleMessenger *r, int st, PipeConnection *con);
+ ~Pipe() override;
+
+ SimpleMessenger *msgr;
+ uint64_t conn_id;
+ ostream& _pipe_prefix(std::ostream &out) const;
+
+ Pipe* get() {
+ return static_cast<Pipe*>(RefCountedObject::get());
+ }
+
+ bool is_connected() {
+ Mutex::Locker l(pipe_lock);
+ return state == STATE_OPEN;
+ }
+
+ char *recv_buf;
+ size_t recv_max_prefetch;
+ size_t recv_ofs;
+ size_t recv_len;
+
+ enum {
+ STATE_ACCEPTING,
+ STATE_CONNECTING,
+ STATE_OPEN,
+ STATE_STANDBY,
+ STATE_CLOSED,
+ STATE_CLOSING,
+ STATE_WAIT // just wait for racing connection
+ };
+
+ static const char *get_state_name(int s) {
+ switch (s) {
+ case STATE_ACCEPTING: return "accepting";
+ case STATE_CONNECTING: return "connecting";
+ case STATE_OPEN: return "open";
+ case STATE_STANDBY: return "standby";
+ case STATE_CLOSED: return "closed";
+ case STATE_CLOSING: return "closing";
+ case STATE_WAIT: return "wait";
+ default: return "UNKNOWN";
+ }
+ }
+ const char *get_state_name() {
+ return get_state_name(state);
+ }
+
+ private:
+ int sd;
+ struct iovec msgvec[SM_IOV_MAX];
+
+ public:
+ int port;
+ int peer_type;
+ entity_addr_t peer_addr;
+ Messenger::Policy policy;
+
+ Mutex pipe_lock;
+ int state;
+ std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
+
+ // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
+
+ std::shared_ptr<AuthSessionHandler> session_security;
+
+ protected:
+ friend class SimpleMessenger;
+ PipeConnectionRef connection_state;
+
+ utime_t backoff; // backoff time
+
+ bool reader_running, reader_needs_join;
+ bool reader_dispatching; /// reader thread is dispatching without pipe_lock
+ bool notify_on_dispatch_done; /// something wants a signal when dispatch done
+ bool writer_running;
+
+ map<int, list<Message*> > out_q; // priority queue for outbound msgs
+ DispatchQueue *in_q;
+ list<Message*> sent;
+ Cond cond;
+ bool send_keepalive;
+ bool send_keepalive_ack;
+ utime_t keepalive_ack_stamp;
+ bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
+
+ __u32 connect_seq, peer_global_seq;
+ uint64_t out_seq;
+ uint64_t in_seq, in_seq_acked;
+
+ void set_socket_options();
+
+ int accept(); // server handshake
+ int connect(); // client handshake
+ void reader();
+ void writer();
+ void unlock_maybe_reap();
+
+ void randomize_out_seq();
+
+ int read_message(Message **pm,
+ AuthSessionHandler *session_security_copy);
+ int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
+ /**
+ * Write the given data (of length len) to the Pipe's socket. This function
+ * will loop until all passed data has been written out.
+ * If more is set, the function will optimize socket writes
+ * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
+ *
+ * @param msg The msghdr to write out
+ * @param len The length of the data in msg
+ * @param more Should be set true if this is one part of a larger message
+ * @return 0, or -1 on failure (unrecoverable -- close the socket).
+ */
+ int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
+ int write_ack(uint64_t s);
+ int write_keepalive();
+ int write_keepalive2(char tag, const utime_t &t);
+
+ void fault(bool reader=false);
+
+ void was_session_reset();
+
+ /* Clean up sent list */
+ void handle_ack(uint64_t seq);
+
+ public:
+ Pipe(const Pipe& other);
+ const Pipe& operator=(const Pipe& other);
+
+ void start_reader();
+ void start_writer();
+ void maybe_start_delay_thread();
+ void join_reader();
+
+ // public constructors
+ static const Pipe& Server(int s);
+ static const Pipe& Client(const entity_addr_t& pi);
+
+ uint64_t get_out_seq() { return out_seq; }
+
+ bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
+
+ entity_addr_t& get_peer_addr() { return peer_addr; }
+
+ void set_peer_addr(const entity_addr_t& a) {
+ if (&peer_addr != &a) // shut up valgrind
+ peer_addr = a;
+ connection_state->set_peer_addr(a);
+ }
+ void set_peer_type(int t) {
+ peer_type = t;
+ connection_state->set_peer_type(t);
+ }
+
+ void register_pipe();
+ void unregister_pipe();
+ void join();
+ /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
+ void stop();
+ /// stop() a Pipe if not already done, and wait for it to finish any
+ /// fast_dispatch in progress.
+ void stop_and_wait();
+
+ void _send(Message *m) {
+ ceph_assert(pipe_lock.is_locked());
+ out_q[m->get_priority()].push_back(m);
+ cond.Signal();
+ }
+ void _send_keepalive() {
+ ceph_assert(pipe_lock.is_locked());
+ send_keepalive = true;
+ cond.Signal();
+ }
+ Message *_get_next_outgoing() {
+ ceph_assert(pipe_lock.is_locked());
+ Message *m = 0;
+ while (!m && !out_q.empty()) {
+ map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
+ if (!p->second.empty()) {
+ m = p->second.front();
+ p->second.pop_front();
+ }
+ if (p->second.empty())
+ out_q.erase(p->first);
+ }
+ return m;
+ }
+
+ /// move all messages in the sent list back into the queue at the highest priority.
+ void requeue_sent();
+ /// discard messages requeued by requeued_sent() up to a given seq
+ void discard_requeued_up_to(uint64_t seq);
+ void discard_out_queue();
+
+ void shutdown_socket() {
+ recv_reset();
+ if (sd >= 0)
+ ::shutdown(sd, SHUT_RDWR);
+ }
+
+ void recv_reset() {
+ recv_len = 0;
+ recv_ofs = 0;
+ }
+ ssize_t do_recv(char *buf, size_t len, int flags);
+ ssize_t buffered_recv(char *buf, size_t len, int flags);
+ bool has_pending_data() { return recv_len > recv_ofs; }
+
+ /**
+ * do a blocking read of len bytes from socket
+ *
+ * @param buf buffer to read into
+ * @param len exact number of bytes to read
+ * @return 0 for success, or -1 on error
+ */
+ int tcp_read(char *buf, unsigned len);
+
+ /**
+ * wait for bytes to become available on the socket
+ *
+ * @return 0 for success, or -1 on error
+ */
+ int tcp_read_wait();
+
+ /**
+ * non-blocking read of available bytes on socket
+ *
+ * This is expected to be used after tcp_read_wait(), and will return
+ * an error if there is no data on the socket to consume.
+ *
+ * @param buf buffer to read into
+ * @param len maximum number of bytes to read
+ * @return bytes read, or -1 on error or when there is no data
+ */
+ ssize_t tcp_read_nonblocking(char *buf, unsigned len);
+
+ /**
+ * blocking write of bytes to socket
+ *
+ * @param buf buffer
+ * @param len number of bytes to write
+ * @return 0 for success, or -1 on error
+ */
+ int tcp_write(const char *buf, unsigned len);
+
+ };
+
+
+#endif