summaryrefslogtreecommitdiffstats
path: root/src/msg/simple/Pipe.h
blob: 81245198460ca71740746cb669ad428aa6d86276 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */

#ifndef CEPH_MSGR_PIPE_H
#define CEPH_MSGR_PIPE_H

#include "auth/AuthSessionHandler.h"

#include "msg/msg_types.h"
#include "msg/Messenger.h"
#include "PipeConnection.h"


class SimpleMessenger;
class DispatchQueue;

static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);

  /**
   * The Pipe is the most complex SimpleMessenger component. It gets
   * two threads, one each for reading and writing on a socket it's handed
   * at creation time, and is responsible for everything that happens on
   * that socket. Besides message transmission, it's responsible for
   * propagating socket errors to the SimpleMessenger and then sticking
   * around in a state where it can provide enough data for the SimpleMessenger
   * to provide reliable Message delivery when it manages to reconnect.
   */
  class Pipe : public RefCountedObject {
    /**
     * The Reader thread handles all reads off the socket -- not just
     * Messages, but also acks and other protocol bits (excepting startup,
     * when the Writer does a couple of reads).
     * All the work is implemented in Pipe itself, of course.
     */
    class Reader : public Thread {
      Pipe *pipe;
    public:
      explicit Reader(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->reader(); return 0; }
    } reader_thread;

    /**
     * The Writer thread handles all writes to the socket (after startup).
     * All the work is implemented in Pipe itself, of course.
     */
    class Writer : public Thread {
      Pipe *pipe;
    public:
      explicit Writer(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->writer(); return 0; }
    } writer_thread;

    class DelayedDelivery;
    DelayedDelivery *delay_thread;
  public:
    Pipe(SimpleMessenger *r, int st, PipeConnection *con);
    ~Pipe() override;

    SimpleMessenger *msgr;
    uint64_t conn_id;
    ostream& _pipe_prefix(std::ostream &out) const;

    Pipe* get() {
      return static_cast<Pipe*>(RefCountedObject::get());
    }

    bool is_connected() {
      Mutex::Locker l(pipe_lock);
      return state == STATE_OPEN;
    }

    char *recv_buf;
    size_t recv_max_prefetch;
    size_t recv_ofs;
    size_t recv_len;

    enum {
      STATE_ACCEPTING,
      STATE_CONNECTING,
      STATE_OPEN,
      STATE_STANDBY,
      STATE_CLOSED,
      STATE_CLOSING,
      STATE_WAIT       // just wait for racing connection
    };

    static const char *get_state_name(int s) {
      switch (s) {
      case STATE_ACCEPTING: return "accepting";
      case STATE_CONNECTING: return "connecting";
      case STATE_OPEN: return "open";
      case STATE_STANDBY: return "standby";
      case STATE_CLOSED: return "closed";
      case STATE_CLOSING: return "closing";
      case STATE_WAIT: return "wait";
      default: return "UNKNOWN";
      }
    }
    const char *get_state_name() {
      return get_state_name(state);
    }

  private:
    int sd;
    struct iovec msgvec[SM_IOV_MAX];

  public:
    int port;
    int peer_type;
    entity_addr_t peer_addr;
    Messenger::Policy policy;
    
    Mutex pipe_lock;
    int state;
    std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED

    // session_security handles any signatures or encryptions required for this pipe's msgs. PLR

    std::shared_ptr<AuthSessionHandler> session_security;

  protected:
    friend class SimpleMessenger;
    PipeConnectionRef connection_state;

    utime_t backoff;         // backoff time

    bool reader_running, reader_needs_join;
    bool reader_dispatching; /// reader thread is dispatching without pipe_lock
    bool notify_on_dispatch_done; /// something wants a signal when dispatch done
    bool writer_running;

    map<int, list<Message*> > out_q;  // priority queue for outbound msgs
    DispatchQueue *in_q;
    list<Message*> sent;
    Cond cond;
    bool send_keepalive;
    bool send_keepalive_ack;
    utime_t keepalive_ack_stamp;
    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
    
    __u32 connect_seq, peer_global_seq;
    uint64_t out_seq;
    uint64_t in_seq, in_seq_acked;
    
    void set_socket_options();

    int accept();   // server handshake
    int connect();  // client handshake
    void reader();
    void writer();
    void unlock_maybe_reap();

    void randomize_out_seq();

    int read_message(Message **pm,
		     AuthSessionHandler *session_security_copy);
    int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
    /**
     * Write the given data (of length len) to the Pipe's socket. This function
     * will loop until all passed data has been written out.
     * If more is set, the function will optimize socket writes
     * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
     *
     * @param msg The msghdr to write out
     * @param len The length of the data in msg
     * @param more Should be set true if this is one part of a larger message
     * @return 0, or -1 on failure (unrecoverable -- close the socket).
     */
    int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
    int write_ack(uint64_t s);
    int write_keepalive();
    int write_keepalive2(char tag, const utime_t &t);

    void fault(bool reader=false);

    void was_session_reset();

    /* Clean up sent list */
    void handle_ack(uint64_t seq);

    public:
    Pipe(const Pipe& other);
    const Pipe& operator=(const Pipe& other);

    void start_reader();
    void start_writer();
    void maybe_start_delay_thread();
    void join_reader();

    // public constructors
    static const Pipe& Server(int s);
    static const Pipe& Client(const entity_addr_t& pi);

    uint64_t get_out_seq() { return out_seq; }

    bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }

    entity_addr_t& get_peer_addr() { return peer_addr; }

    void set_peer_addr(const entity_addr_t& a) {
      if (&peer_addr != &a)  // shut up valgrind
        peer_addr = a;
      connection_state->set_peer_addr(a);
    }
    void set_peer_type(int t) {
      peer_type = t;
      connection_state->set_peer_type(t);
    }

    void register_pipe();
    void unregister_pipe();
    void join();
    /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
    void stop();
    /// stop() a Pipe if not already done, and wait for it to finish any
    /// fast_dispatch in progress.
    void stop_and_wait();

    void _send(Message *m) {
      ceph_assert(pipe_lock.is_locked());
      out_q[m->get_priority()].push_back(m);
      cond.Signal();
    }
    void _send_keepalive() {
      ceph_assert(pipe_lock.is_locked());
      send_keepalive = true;
      cond.Signal();
    }
    Message *_get_next_outgoing() {
      ceph_assert(pipe_lock.is_locked());
      Message *m = 0;
      while (!m && !out_q.empty()) {
        map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
        if (!p->second.empty()) {
          m = p->second.front();
          p->second.pop_front();
        }
        if (p->second.empty())
          out_q.erase(p->first);
      }
      return m;
    }

    /// move all messages in the sent list back into the queue at the highest priority.
    void requeue_sent();
    /// discard messages requeued by requeued_sent() up to a given seq
    void discard_requeued_up_to(uint64_t seq);
    void discard_out_queue();

    void shutdown_socket() {
      recv_reset();
      if (sd >= 0)
        ::shutdown(sd, SHUT_RDWR);
    }

    void recv_reset() {
      recv_len = 0;
      recv_ofs = 0;
    }
    ssize_t do_recv(char *buf, size_t len, int flags);
    ssize_t buffered_recv(char *buf, size_t len, int flags);
    bool has_pending_data() { return recv_len > recv_ofs; }

    /**
     * do a blocking read of len bytes from socket
     *
     * @param buf buffer to read into
     * @param len exact number of bytes to read
     * @return 0 for success, or -1 on error
     */
    int tcp_read(char *buf, unsigned len);

    /**
     * wait for bytes to become available on the socket
     *
     * @return 0 for success, or -1 on error
     */
    int tcp_read_wait();

    /**
     * non-blocking read of available bytes on socket
     *
     * This is expected to be used after tcp_read_wait(), and will return
     * an error if there is no data on the socket to consume.
     *
     * @param buf buffer to read into
     * @param len maximum number of bytes to read
     * @return bytes read, or -1 on error or when there is no data
     */
    ssize_t tcp_read_nonblocking(char *buf, unsigned len);

    /**
     * blocking write of bytes to socket
     *
     * @param buf buffer
     * @param len number of bytes to write
     * @return 0 for success, or -1 on error
     */
    int tcp_write(const char *buf, unsigned len);

  };


#endif