diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/simple/Pipe.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/msg/simple/Pipe.h | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h new file mode 100644 index 00000000..81245198 --- /dev/null +++ b/src/msg/simple/Pipe.h @@ -0,0 +1,315 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSGR_PIPE_H +#define CEPH_MSGR_PIPE_H + +#include "auth/AuthSessionHandler.h" + +#include "msg/msg_types.h" +#include "msg/Messenger.h" +#include "PipeConnection.h" + + +class SimpleMessenger; +class DispatchQueue; + +static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); + + /** + * The Pipe is the most complex SimpleMessenger component. It gets + * two threads, one each for reading and writing on a socket it's handed + * at creation time, and is responsible for everything that happens on + * that socket. Besides message transmission, it's responsible for + * propagating socket errors to the SimpleMessenger and then sticking + * around in a state where it can provide enough data for the SimpleMessenger + * to provide reliable Message delivery when it manages to reconnect. + */ + class Pipe : public RefCountedObject { + /** + * The Reader thread handles all reads off the socket -- not just + * Messages, but also acks and other protocol bits (excepting startup, + * when the Writer does a couple of reads). + * All the work is implemented in Pipe itself, of course. + */ + class Reader : public Thread { + Pipe *pipe; + public: + explicit Reader(Pipe *p) : pipe(p) {} + void *entry() override { pipe->reader(); return 0; } + } reader_thread; + + /** + * The Writer thread handles all writes to the socket (after startup). + * All the work is implemented in Pipe itself, of course. + */ + class Writer : public Thread { + Pipe *pipe; + public: + explicit Writer(Pipe *p) : pipe(p) {} + void *entry() override { pipe->writer(); return 0; } + } writer_thread; + + class DelayedDelivery; + DelayedDelivery *delay_thread; + public: + Pipe(SimpleMessenger *r, int st, PipeConnection *con); + ~Pipe() override; + + SimpleMessenger *msgr; + uint64_t conn_id; + ostream& _pipe_prefix(std::ostream &out) const; + + Pipe* get() { + return static_cast<Pipe*>(RefCountedObject::get()); + } + + bool is_connected() { + Mutex::Locker l(pipe_lock); + return state == STATE_OPEN; + } + + char *recv_buf; + size_t recv_max_prefetch; + size_t recv_ofs; + size_t recv_len; + + enum { + STATE_ACCEPTING, + STATE_CONNECTING, + STATE_OPEN, + STATE_STANDBY, + STATE_CLOSED, + STATE_CLOSING, + STATE_WAIT // just wait for racing connection + }; + + static const char *get_state_name(int s) { + switch (s) { + case STATE_ACCEPTING: return "accepting"; + case STATE_CONNECTING: return "connecting"; + case STATE_OPEN: return "open"; + case STATE_STANDBY: return "standby"; + case STATE_CLOSED: return "closed"; + case STATE_CLOSING: return "closing"; + case STATE_WAIT: return "wait"; + default: return "UNKNOWN"; + } + } + const char *get_state_name() { + return get_state_name(state); + } + + private: + int sd; + struct iovec msgvec[SM_IOV_MAX]; + + public: + int port; + int peer_type; + entity_addr_t peer_addr; + Messenger::Policy policy; + + Mutex pipe_lock; + int state; + std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED + + // session_security handles any signatures or encryptions required for this pipe's msgs. PLR + + std::shared_ptr<AuthSessionHandler> session_security; + + protected: + friend class SimpleMessenger; + PipeConnectionRef connection_state; + + utime_t backoff; // backoff time + + bool reader_running, reader_needs_join; + bool reader_dispatching; /// reader thread is dispatching without pipe_lock + bool notify_on_dispatch_done; /// something wants a signal when dispatch done + bool writer_running; + + map<int, list<Message*> > out_q; // priority queue for outbound msgs + DispatchQueue *in_q; + list<Message*> sent; + Cond cond; + bool send_keepalive; + bool send_keepalive_ack; + utime_t keepalive_ack_stamp; + bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it + + __u32 connect_seq, peer_global_seq; + uint64_t out_seq; + uint64_t in_seq, in_seq_acked; + + void set_socket_options(); + + int accept(); // server handshake + int connect(); // client handshake + void reader(); + void writer(); + void unlock_maybe_reap(); + + void randomize_out_seq(); + + int read_message(Message **pm, + AuthSessionHandler *session_security_copy); + int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); + /** + * Write the given data (of length len) to the Pipe's socket. This function + * will loop until all passed data has been written out. + * If more is set, the function will optimize socket writes + * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). + * + * @param msg The msghdr to write out + * @param len The length of the data in msg + * @param more Should be set true if this is one part of a larger message + * @return 0, or -1 on failure (unrecoverable -- close the socket). + */ + int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false); + int write_ack(uint64_t s); + int write_keepalive(); + int write_keepalive2(char tag, const utime_t &t); + + void fault(bool reader=false); + + void was_session_reset(); + + /* Clean up sent list */ + void handle_ack(uint64_t seq); + + public: + Pipe(const Pipe& other); + const Pipe& operator=(const Pipe& other); + + void start_reader(); + void start_writer(); + void maybe_start_delay_thread(); + void join_reader(); + + // public constructors + static const Pipe& Server(int s); + static const Pipe& Client(const entity_addr_t& pi); + + uint64_t get_out_seq() { return out_seq; } + + bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } + + entity_addr_t& get_peer_addr() { return peer_addr; } + + void set_peer_addr(const entity_addr_t& a) { + if (&peer_addr != &a) // shut up valgrind + peer_addr = a; + connection_state->set_peer_addr(a); + } + void set_peer_type(int t) { + peer_type = t; + connection_state->set_peer_type(t); + } + + void register_pipe(); + void unregister_pipe(); + void join(); + /// stop a Pipe by closing its socket and setting it to STATE_CLOSED + void stop(); + /// stop() a Pipe if not already done, and wait for it to finish any + /// fast_dispatch in progress. + void stop_and_wait(); + + void _send(Message *m) { + ceph_assert(pipe_lock.is_locked()); + out_q[m->get_priority()].push_back(m); + cond.Signal(); + } + void _send_keepalive() { + ceph_assert(pipe_lock.is_locked()); + send_keepalive = true; + cond.Signal(); + } + Message *_get_next_outgoing() { + ceph_assert(pipe_lock.is_locked()); + Message *m = 0; + while (!m && !out_q.empty()) { + map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); + if (!p->second.empty()) { + m = p->second.front(); + p->second.pop_front(); + } + if (p->second.empty()) + out_q.erase(p->first); + } + return m; + } + + /// move all messages in the sent list back into the queue at the highest priority. + void requeue_sent(); + /// discard messages requeued by requeued_sent() up to a given seq + void discard_requeued_up_to(uint64_t seq); + void discard_out_queue(); + + void shutdown_socket() { + recv_reset(); + if (sd >= 0) + ::shutdown(sd, SHUT_RDWR); + } + + void recv_reset() { + recv_len = 0; + recv_ofs = 0; + } + ssize_t do_recv(char *buf, size_t len, int flags); + ssize_t buffered_recv(char *buf, size_t len, int flags); + bool has_pending_data() { return recv_len > recv_ofs; } + + /** + * do a blocking read of len bytes from socket + * + * @param buf buffer to read into + * @param len exact number of bytes to read + * @return 0 for success, or -1 on error + */ + int tcp_read(char *buf, unsigned len); + + /** + * wait for bytes to become available on the socket + * + * @return 0 for success, or -1 on error + */ + int tcp_read_wait(); + + /** + * non-blocking read of available bytes on socket + * + * This is expected to be used after tcp_read_wait(), and will return + * an error if there is no data on the socket to consume. + * + * @param buf buffer to read into + * @param len maximum number of bytes to read + * @return bytes read, or -1 on error or when there is no data + */ + ssize_t tcp_read_nonblocking(char *buf, unsigned len); + + /** + * blocking write of bytes to socket + * + * @param buf buffer + * @param len number of bytes to write + * @return 0 for success, or -1 on error + */ + int tcp_write(const char *buf, unsigned len); + + }; + + +#endif |