diff options
Diffstat (limited to '')
-rw-r--r-- | src/msg/simple/Accepter.cc | 402 | ||||
-rw-r--r-- | src/msg/simple/Accepter.h | 50 | ||||
-rw-r--r-- | src/msg/simple/Pipe.cc | 2712 | ||||
-rw-r--r-- | src/msg/simple/Pipe.h | 315 | ||||
-rw-r--r-- | src/msg/simple/PipeConnection.cc | 96 | ||||
-rw-r--r-- | src/msg/simple/PipeConnection.h | 59 | ||||
-rw-r--r-- | src/msg/simple/SimpleMessenger.cc | 769 | ||||
-rw-r--r-- | src/msg/simple/SimpleMessenger.h | 414 |
8 files changed, 4817 insertions, 0 deletions
diff --git a/src/msg/simple/Accepter.cc b/src/msg/simple/Accepter.cc new file mode 100644 index 00000000..52f3df2b --- /dev/null +++ b/src/msg/simple/Accepter.cc @@ -0,0 +1,402 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "include/compat.h" +#include "include/sock_compat.h" +#include <iterator> +#include <sys/socket.h> +#include <netinet/tcp.h> +#include <sys/uio.h> +#include <limits.h> +#include <poll.h> + +#include "msg/msg_types.h" +#include "msg/Message.h" + +#include "Accepter.h" +#include "Pipe.h" +#include "SimpleMessenger.h" + +#include "common/debug.h" +#include "common/errno.h" +#include "common/safe_io.h" + +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "accepter." + + +/******************************************** + * Accepter + */ + +int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) { + int selfpipe[2]; + if (pipe_cloexec(selfpipe) < 0) { + int e = errno; + lderr(msgr->cct) << __func__ << " unable to create the selfpipe: " + << cpp_strerror(e) << dendl; + return -e; + } + for (size_t i = 0; i < std::size(selfpipe); i++) { + int rc = fcntl(selfpipe[i], F_GETFL); + ceph_assert(rc != -1); + rc = fcntl(selfpipe[i], F_SETFL, rc | O_NONBLOCK); + ceph_assert(rc != -1); + } + *pipe_rd = selfpipe[0]; + *pipe_wr = selfpipe[1]; + return 0; +} + +int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports) +{ + const auto& conf = msgr->cct->_conf; + // bind to a socket + ldout(msgr->cct,10) << __func__ << dendl; + + int family; + switch (bind_addr.get_family()) { + case AF_INET: + case AF_INET6: + family = bind_addr.get_family(); + break; + + default: + // bind_addr is empty + family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; + } + + /* socket creation */ + listen_sd = socket_cloexec(family, SOCK_STREAM, 0); + if (listen_sd < 0) { + int e = errno; + lderr(msgr->cct) << __func__ << " unable to create socket: " + << cpp_strerror(e) << dendl; + return -e; + } + ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl; + + // use whatever user specified (if anything) + entity_addr_t listen_addr = bind_addr; + if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { + listen_addr.set_type(entity_addr_t::TYPE_LEGACY); + } + listen_addr.set_family(family); + + /* bind to port */ + int rc = -1; + int r = -1; + + for (int i = 0; i < conf->ms_bind_retry_count; i++) { + + if (i > 0) { + lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " + << conf->ms_bind_retry_delay << " seconds " << dendl; + sleep(conf->ms_bind_retry_delay); + } + + if (listen_addr.get_port()) { + // specific port + + // reuse addr+port when possible + int on = 1; + rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to setsockopt: " + << cpp_strerror(errno) << dendl; + r = -errno; + continue; + } + + rc = ::bind(listen_sd, listen_addr.get_sockaddr(), + listen_addr.get_sockaddr_len()); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << ": " << cpp_strerror(errno) << dendl; + r = -errno; + continue; + } + } else { + // try a range of ports + for (int port = msgr->cct->_conf->ms_bind_port_min; + port <= msgr->cct->_conf->ms_bind_port_max; port++) { + if (avoid_ports.count(port)) + continue; + + listen_addr.set_port(port); + rc = ::bind(listen_sd, listen_addr.get_sockaddr(), + listen_addr.get_sockaddr_len()); + if (rc == 0) + break; + } + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << " on any port in range " << msgr->cct->_conf->ms_bind_port_min + << "-" << msgr->cct->_conf->ms_bind_port_max + << ": " << cpp_strerror(errno) + << dendl; + r = -errno; + // Clear port before retry, otherwise we shall fail again. + listen_addr.set_port(0); + continue; + } + ldout(msgr->cct,10) << __func__ << " bound on random port " + << listen_addr << dendl; + } + + if (rc == 0) + break; + } + + // It seems that binding completely failed, return with that exit status + if (rc < 0) { + lderr(msgr->cct) << __func__ << " was unable to bind after " + << conf->ms_bind_retry_count << " attempts: " + << cpp_strerror(errno) << dendl; + ::close(listen_sd); + listen_sd = -1; + return r; + } + + // what port did we get? + sockaddr_storage ss; + socklen_t llen = sizeof(ss); + rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); + if (rc < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " failed getsockname: " + << cpp_strerror(rc) << dendl; + ::close(listen_sd); + listen_sd = -1; + return rc; + } + listen_addr.set_sockaddr((sockaddr*)&ss); + + if (msgr->cct->_conf->ms_tcp_rcvbuf) { + int size = msgr->cct->_conf->ms_tcp_rcvbuf; + rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, + (void*)&size, sizeof(size)); + if (rc < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to " + << size << ": " << cpp_strerror(rc) << dendl; + ::close(listen_sd); + listen_sd = -1; + return rc; + } + } + + ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl; + + // listen! + rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog); + if (rc < 0) { + rc = -errno; + lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr + << ": " << cpp_strerror(rc) << dendl; + ::close(listen_sd); + listen_sd = -1; + return rc; + } + + msgr->set_myaddrs(entity_addrvec_t(bind_addr)); + if (bind_addr != entity_addr_t() && + !bind_addr.is_blank_ip()) + msgr->learned_addr(bind_addr); + else + ceph_assert(msgr->get_need_addr()); // should still be true. + + if (msgr->get_myaddr_legacy().get_port() == 0) { + msgr->set_myaddrs(entity_addrvec_t(listen_addr)); + } + entity_addr_t addr = msgr->get_myaddr_legacy(); + addr.nonce = nonce; + msgr->set_myaddrs(entity_addrvec_t(addr)); + + msgr->init_local_connection(); + + rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd); + if (rc < 0) { + lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr + << ": " << cpp_strerror(rc) << dendl; + return rc; + } + + ldout(msgr->cct,1) << __func__ << " my_addrs " << *msgr->my_addrs + << " my_addr " << msgr->my_addr + << " need_addr=" << msgr->get_need_addr() << dendl; + return 0; +} + +int Accepter::rebind(const set<int>& avoid_ports) +{ + ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl; + + entity_addr_t addr = msgr->get_myaddr_legacy(); + set<int> new_avoid = avoid_ports; + new_avoid.insert(addr.get_port()); + addr.set_port(0); + + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + entity_addrvec_t newaddrs = *msgr->my_addrs; + newaddrs.v[0].nonce = nonce; + msgr->set_myaddrs(newaddrs); + ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and addr " + << msgr->my_addr << dendl; + + ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; + int r = bind(addr, new_avoid); + if (r == 0) + start(); + return r; +} + +int Accepter::start() +{ + ldout(msgr->cct,1) << __func__ << dendl; + + // start thread + create("ms_accepter"); + + return 0; +} + +void *Accepter::entry() +{ + ldout(msgr->cct,1) << __func__ << " start" << dendl; + + int errors = 0; + + struct pollfd pfd[2]; + memset(pfd, 0, sizeof(pfd)); + + pfd[0].fd = listen_sd; + pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + pfd[1].fd = shutdown_rd_fd; + pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + while (!done) { + ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; + int r = poll(pfd, 2, -1); + if (r < 0) { + if (errno == EINTR) { + continue; + } + ldout(msgr->cct,1) << __func__ << " poll got error" + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl; + ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl; + ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl; + + if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { + ldout(msgr->cct,1) << __func__ << " poll got errors in revents " + << pfd[0].revents << dendl; + ceph_abort(); + } + if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { + // We got "signaled" to exit the poll + // clean the selfpipe + char ch; + if (::read(shutdown_rd_fd, &ch, sizeof(ch)) == -1) { + if (errno != EAGAIN) + ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } + break; + } + if (done) break; + + // accept + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = accept_cloexec(listen_sd, (sockaddr*)&ss, &slen); + if (sd >= 0) { + errors = 0; + ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; + + msgr->add_accept_pipe(sd); + } else { + int e = errno; + ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd + << " errno " << e << " " << cpp_strerror(e) << dendl; + if (++errors > msgr->cct->_conf->ms_max_accept_failures) { + lderr(msgr->cct) << "accetper has encoutered enough errors, just do ceph_abort()." << dendl; + ceph_abort(); + } + } + } + + ldout(msgr->cct,20) << __func__ << " closing" << dendl; + // socket is closed right after the thread has joined. + // closing it here might race + if (shutdown_rd_fd >= 0) { + ::close(shutdown_rd_fd); + shutdown_rd_fd = -1; + } + + ldout(msgr->cct,10) << __func__ << " stopping" << dendl; + return 0; +} + +void Accepter::stop() +{ + done = true; + ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl; + + if (shutdown_wr_fd < 0) + return; + + // Send a byte to the shutdown pipe that the thread is listening to + char ch = 0x0; + int ret = safe_write(shutdown_wr_fd, &ch, sizeof(ch)); + if (ret < 0) { + ldout(msgr->cct,1) << __func__ << " close failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else { + ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl; + } + VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd)); + shutdown_wr_fd = -1; + + // wait for thread to stop before closing the socket, to avoid + // racing against fd re-use. + if (is_started()) { + ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl; + join(); + } + + if (listen_sd >= 0) { + if (::close(listen_sd) < 0) { + ldout(msgr->cct,1) << __func__ << " close listen_sd failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } + listen_sd = -1; + } + if (shutdown_rd_fd >= 0) { + if (::close(shutdown_rd_fd) < 0) { + ldout(msgr->cct,1) << __func__ << " close shutdown_rd_fd failed: " + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } + shutdown_rd_fd = -1; + } + done = false; +} + + + + diff --git a/src/msg/simple/Accepter.h b/src/msg/simple/Accepter.h new file mode 100644 index 00000000..7824c3a1 --- /dev/null +++ b/src/msg/simple/Accepter.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ACCEPTER_H +#define CEPH_MSG_ACCEPTER_H + +#include "common/Thread.h" + +class SimpleMessenger; +struct entity_addr_t; + +/** + * If the SimpleMessenger binds to a specific address, the Accepter runs + * and listens for incoming connections. + */ +class Accepter : public Thread { + SimpleMessenger *msgr; + bool done; + int listen_sd; + uint64_t nonce; + int shutdown_rd_fd; + int shutdown_wr_fd; + int create_selfpipe(int *pipe_rd, int *pipe_wr); + +public: + Accepter(SimpleMessenger *r, uint64_t n) + : msgr(r), done(false), listen_sd(-1), nonce(n), + shutdown_rd_fd(-1), shutdown_wr_fd(-1) + {} + + void *entry() override; + void stop(); + int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports); + int rebind(const set<int>& avoid_port); + int start(); +}; + + +#endif diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc new file mode 100644 index 00000000..fd44dc4e --- /dev/null +++ b/src/msg/simple/Pipe.cc @@ -0,0 +1,2712 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <sys/uio.h> +#include <limits.h> +#include <poll.h> + +#include "msg/Message.h" +#include "Pipe.h" +#include "SimpleMessenger.h" + +#include "common/debug.h" +#include "common/errno.h" +#include "common/valgrind.h" + +// Below included to get encode_encrypt(); That probably should be in Crypto.h, instead + +#include "auth/cephx/CephxProtocol.h" +#include "auth/AuthSessionHandler.h" + +#include "include/compat.h" +#include "include/sock_compat.h" +#include "include/random.h" + +// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR +#define SEQ_MASK 0x7fffffff +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << *this +ostream& Pipe::_pipe_prefix(std::ostream &out) const { + return out << "-- " << msgr->get_myaddr_legacy() << " >> " << peer_addr + << " pipe(" << this + << " sd=" << sd << " :" << port + << " s=" << state + << " pgs=" << peer_global_seq + << " cs=" << connect_seq + << " l=" << policy.lossy + << " c=" << connection_state + << ")."; +} + +ostream& operator<<(ostream &out, const Pipe &pipe) { + return pipe._pipe_prefix(out); +} + +/** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * in_q (SimpleMessenger::dispatch_queue). + * Please note that this probably has issues with Pipe shutdown and + * replacement semantics. I've tried, but no guarantees. + */ +class Pipe::DelayedDelivery: public Thread { + Pipe *pipe; + std::deque< pair<utime_t,Message*> > delay_queue; + Mutex delay_lock; + Cond delay_cond; + int flush_count; + bool active_flush; + bool stop_delayed_delivery; + bool delay_dispatching; // we are in fast dispatch now + bool stop_fast_dispatching_flag; // we need to stop fast dispatching + +public: + explicit DelayedDelivery(Pipe *p) + : pipe(p), + delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), + active_flush(false), + stop_delayed_delivery(false), + delay_dispatching(false), + stop_fast_dispatching_flag(false) { } + ~DelayedDelivery() override { + discard(); + } + void *entry() override; + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard(); + void flush(); + bool is_flushing() { + Mutex::Locker l(delay_lock); + return flush_count > 0 || active_flush; + } + void wait_for_flush() { + Mutex::Locker l(delay_lock); + while (flush_count > 0 || active_flush) + delay_cond.Wait(delay_lock); + } + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + void steal_for_pipe(Pipe *new_owner) { + Mutex::Locker l(delay_lock); + pipe = new_owner; + } + /** + * We need to stop fast dispatching before we need to stop putting + * normal messages into the DispatchQueue. + */ + void stop_fast_dispatching(); +}; + +/************************************** + * Pipe + */ + +Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) + : RefCountedObject(r->cct), + reader_thread(this), + writer_thread(this), + delay_thread(NULL), + msgr(r), + conn_id(r->dispatch_queue.get_id()), + recv_ofs(0), + recv_len(0), + sd(-1), port(0), + peer_type(-1), + pipe_lock("SimpleMessenger::Pipe::pipe_lock"), + state(st), + connection_state(NULL), + reader_running(false), reader_needs_join(false), + reader_dispatching(false), notify_on_dispatch_done(false), + writer_running(false), + in_q(&(r->dispatch_queue)), + send_keepalive(false), + send_keepalive_ack(false), + connect_seq(0), peer_global_seq(0), + out_seq(0), in_seq(0), in_seq_acked(0) { + ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket"); + ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state"); + ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len"); + ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs"); + if (con) { + connection_state = con; + connection_state->reset_pipe(this); + } else { + connection_state = new PipeConnection(msgr->cct, msgr); + connection_state->pipe = get(); + } + + randomize_out_seq(); + + msgr->timeout = msgr->cct->_conf->ms_connection_idle_timeout * 1000; //convert to ms + if (msgr->timeout == 0) + msgr->timeout = -1; + + recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size; + recv_buf = new char[recv_max_prefetch]; +} + +Pipe::~Pipe() +{ + ceph_assert(out_q.empty()); + ceph_assert(sent.empty()); + delete delay_thread; + delete[] recv_buf; +} + +void Pipe::handle_ack(uint64_t seq) +{ + lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl; + // trim sent list + while (!sent.empty() && + sent.front()->get_seq() <= seq) { + Message *m = sent.front(); + sent.pop_front(); + lsubdout(msgr->cct, ms, 10) << "reader got ack seq " + << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; + m->put(); + } +} + +void Pipe::start_reader() +{ + ceph_assert(pipe_lock.is_locked()); + ceph_assert(!reader_running); + if (reader_needs_join) { + reader_thread.join(); + reader_needs_join = false; + } + reader_running = true; + reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); +} + +void Pipe::maybe_start_delay_thread() +{ + if (!delay_thread) { + auto pos = msgr->cct->_conf.get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type)); + if (pos != string::npos) { + lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; + delay_thread = new DelayedDelivery(this); + delay_thread->create("ms_pipe_delay"); + } + } +} + +void Pipe::start_writer() +{ + ceph_assert(pipe_lock.is_locked()); + ceph_assert(!writer_running); + writer_running = true; + writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes); +} + +void Pipe::join_reader() +{ + if (!reader_running) + return; + cond.Signal(); + pipe_lock.Unlock(); + reader_thread.join(); + pipe_lock.Lock(); + reader_needs_join = false; +} + +void Pipe::DelayedDelivery::discard() +{ + lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } +} + +void Pipe::DelayedDelivery::flush() +{ + lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl; + Mutex::Locker l(delay_lock); + flush_count = delay_queue.size(); + delay_cond.Signal(); +} + +void *Pipe::DelayedDelivery::entry() +{ + Mutex::Locker locker(delay_lock); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl; + + while (!stop_delayed_delivery) { + if (delay_queue.empty()) { + lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond.Wait(delay_lock); + continue; + } + utime_t release = delay_queue.front().first; + Message *m = delay_queue.front().second; + string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type; + if (!flush_count && + (release > ceph_clock_now() && + (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { + lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; + delay_cond.WaitUntil(delay_lock, release); + continue; + } + lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; + delay_queue.pop_front(); + if (flush_count > 0) { + --flush_count; + active_flush = true; + } + if (pipe->in_q->can_fast_dispatch(m)) { + if (!stop_fast_dispatching_flag) { + delay_dispatching = true; + delay_lock.Unlock(); + pipe->in_q->fast_dispatch(m); + delay_lock.Lock(); + delay_dispatching = false; + if (stop_fast_dispatching_flag) { + // we need to let the stopping thread proceed + delay_cond.Signal(); + delay_lock.Unlock(); + delay_lock.Lock(); + } + } + } else { + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } + active_flush = false; + } + lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl; + return NULL; +} + +void Pipe::DelayedDelivery::stop_fast_dispatching() { + Mutex::Locker l(delay_lock); + stop_fast_dispatching_flag = true; + while (delay_dispatching) + delay_cond.Wait(delay_lock); +} + + +int Pipe::accept() +{ + ldout(msgr->cct,10) << "accept" << dendl; + ceph_assert(pipe_lock.is_locked()); + ceph_assert(state == STATE_ACCEPTING); + + pipe_lock.Unlock(); + + // vars + bufferlist addrs; + entity_addr_t socket_addr; + socklen_t len; + int r; + char banner[strlen(CEPH_BANNER)+1]; + bufferlist addrbl; + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + Pipe *existing = 0; + bufferptr bp; + bufferlist authorizer, authorizer_reply; + bool authorizer_valid; + uint64_t feat_missing; + bool replaced = false; + // this variable denotes if the connection attempt from peer is a hard + // reset or not, it is true if there is an existing connection and the + // connection sequence from peer is equal to zero + bool is_reset_from_peer = false; + CryptoKey session_key; + int removed; // single-use down below + + // this should roughly mirror pseudocode at + // http://ceph.com/wiki/Messaging_protocol + int reply_tag = 0; + uint64_t existing_seq = -1; + + // used for reading in the remote acked seq on connect + uint64_t newly_acked_seq = 0; + + bool need_challenge = false; + bool had_challenge = false; + std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge; + + recv_reset(); + + set_socket_options(); + + // announce myself. + r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); + if (r < 0) { + ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; + goto fail_unlocked; + } + + // and my addr + encode(msgr->my_addr, addrs, 0); // legacy + + port = msgr->my_addr.get_port(); + + // and peer's socket addr (they might not know their ip) + sockaddr_storage ss; + len = sizeof(ss); + r = ::getpeername(sd, (sockaddr*)&ss, &len); + if (r < 0) { + ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl; + goto fail_unlocked; + } + socket_addr.set_sockaddr((sockaddr*)&ss); + encode(socket_addr, addrs, 0); // legacy + + r = tcp_write(addrs.c_str(), addrs.length()); + if (r < 0) { + ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; + goto fail_unlocked; + } + + ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl; + + // identify peer + if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { + ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; + goto fail_unlocked; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + banner[strlen(CEPH_BANNER)] = 0; + ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; + goto fail_unlocked; + } + { + bufferptr tp(sizeof(ceph_entity_addr)); + addrbl.push_back(std::move(tp)); + } + if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { + ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; + goto fail_unlocked; + } + try { + auto ti = addrbl.cbegin(); + decode(peer_addr, ti); + } catch (const buffer::error& e) { + ldout(msgr->cct,2) << __func__ << " decode peer_addr failed: " << e.what() + << dendl; + goto fail_unlocked; + } + + ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl; + if (peer_addr.is_blank_ip()) { + // peer apparently doesn't know what ip they have; figure it out for them. + int port = peer_addr.get_port(); + peer_addr.u = socket_addr.u; + peer_addr.set_port(port); + ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr + << " (socket is " << socket_addr << ")" << dendl; + } + set_peer_addr(peer_addr); // so that connection_state gets set up + + while (1) { + if (tcp_read((char*)&connect, sizeof(connect)) < 0) { + ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; + goto fail_unlocked; + } + + authorizer.clear(); + if (connect.authorizer_len) { + bp = buffer::create(connect.authorizer_len); + if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) { + ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl; + goto fail_unlocked; + } + authorizer.push_back(std::move(bp)); + authorizer_reply.clear(); + } + + ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq + << " global_seq " << connect.global_seq + << dendl; + + msgr->lock.Lock(); // FIXME + pipe_lock.Lock(); + if (msgr->dispatch_queue.stop) + goto shutting_down; + if (state != STATE_ACCEPTING) { + goto shutting_down; + } + + // note peer's type, flags + set_peer_type(connect.host_type); + policy = msgr->get_policy(connect.host_type); + ldout(msgr->cct,10) << "accept of host_type " << connect.host_type + << ", policy.lossy=" << policy.lossy + << " policy.server=" << policy.server + << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck + << dendl; + + memset(&reply, 0, sizeof(reply)); + reply.protocol_version = msgr->get_proto_version(peer_type, false); + msgr->lock.Unlock(); + + // mismatch? + ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version + << ", their proto " << connect.protocol_version << dendl; + if (connect.protocol_version != reply.protocol_version) { + reply.tag = CEPH_MSGR_TAG_BADPROTOVER; + goto reply; + } + + // require signatures for cephx? + if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { + if (peer_type == CEPH_ENTITY_TYPE_OSD || + peer_type == CEPH_ENTITY_TYPE_MDS || + peer_type == CEPH_ENTITY_TYPE_MGR) { + if (msgr->cct->_conf->cephx_require_signatures || + msgr->cct->_conf->cephx_cluster_require_signatures) { + ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (msgr->cct->_conf->cephx_require_version >= 2 || + msgr->cct->_conf->cephx_cluster_require_version >= 2) { + ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl; + policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; + } + } else { + if (msgr->cct->_conf->cephx_require_signatures || + msgr->cct->_conf->cephx_service_require_signatures) { + ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (msgr->cct->_conf->cephx_require_version >= 2 || + msgr->cct->_conf->cephx_service_require_version >= 2) { + ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl; + policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; + } + } + } + + feat_missing = policy.features_required & ~(uint64_t)connect.features; + if (feat_missing) { + ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; + reply.tag = CEPH_MSGR_TAG_FEATURES; + goto reply; + } + + // Check the authorizer. If not good, bail out. + + pipe_lock.Unlock(); + + need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2); + had_challenge = (bool)authorizer_challenge; + authorizer_reply.clear(); + if (!msgr->ms_deliver_verify_authorizer( + connection_state.get(), peer_type, connect.authorizer_protocol, + authorizer, + authorizer_reply, authorizer_valid, session_key, + nullptr /* connection_secret */, + need_challenge ? &authorizer_challenge : nullptr) || + !authorizer_valid) { + pipe_lock.Lock(); + if (state != STATE_ACCEPTING) + goto shutting_down_msgr_unlocked; + if (!had_challenge && need_challenge && authorizer_challenge) { + ldout(msgr->cct,10) << "accept: challenging authorizer " + << authorizer_reply.length() + << " bytes" << dendl; + ceph_assert(authorizer_reply.length()); + reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER; + } else { + ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; + reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; + } + session_security.reset(); + goto reply; + } + + // We've verified the authorizer for this pipe, so set up the session security structure. PLR + + ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; + + retry_existing_lookup: + msgr->lock.Lock(); + pipe_lock.Lock(); + if (msgr->dispatch_queue.stop) + goto shutting_down; + if (state != STATE_ACCEPTING) + goto shutting_down; + + // existing? + existing = msgr->_lookup_pipe(peer_addr); + if (existing) { + existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) + if (existing->reader_dispatching) { + /** we need to wait, or we can deadlock if downstream + * fast_dispatchers are (naughtily!) waiting on resources + * held by somebody trying to make use of the SimpleMessenger lock. + * So drop locks, wait, and retry. It just looks like a slow network + * to everybody else. + * + * We take a ref to existing here since it might get reaped before we + * wake up (see bug #15870). We can be confident that it lived until + * locked it since we held the msgr lock from _lookup_pipe through to + * locking existing->lock and checking reader_dispatching. + */ + existing->get(); + pipe_lock.Unlock(); + msgr->lock.Unlock(); + existing->notify_on_dispatch_done = true; + while (existing->reader_dispatching) + existing->cond.Wait(existing->pipe_lock); + existing->pipe_lock.Unlock(); + existing->put(); + existing = nullptr; + goto retry_existing_lookup; + } + + if (connect.global_seq < existing->peer_global_seq) { + ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; + reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; + reply.global_seq = existing->peer_global_seq; // so we can send it below.. + existing->pipe_lock.Unlock(); + msgr->lock.Unlock(); + goto reply; + } else { + ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + << " <= " << connect.global_seq << ", looks ok" << dendl; + } + + if (existing->policy.lossy) { + ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy=" + << policy.lossy << ")" << dendl; + existing->was_session_reset(); + goto replace; + } + + ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq + << " vs existing " << existing->connect_seq + << " state " << existing->get_state_name() << dendl; + + if (connect.connect_seq == 0 && existing->connect_seq > 0) { + ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl; + // this is a hard reset from peer + is_reset_from_peer = true; + if (policy.resetcheck) + existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s + goto replace; + } + + if (connect.connect_seq < existing->connect_seq) { + // old attempt, or we sent READY but they didn't get it. + ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; + goto retry_session; + } + + if (connect.connect_seq == existing->connect_seq) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->state == STATE_OPEN || + existing->state == STATE_STANDBY) { + ldout(msgr->cct,10) << "accept connection race, existing " << existing + << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq + << ", OPEN|STANDBY, RETRY_SESSION" << dendl; + goto retry_session; + } + + // connection race? + if (peer_addr < msgr->my_addr || + existing->policy.server) { + // incoming wins + ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl; + if (!(existing->state == STATE_CONNECTING || + existing->state == STATE_WAIT)) + lderr(msgr->cct) << "accept race bad state, would replace, existing=" + << existing->get_state_name() + << " " << existing << ".cseq=" << existing->connect_seq + << " == " << connect.connect_seq + << dendl; + ceph_assert(existing->state == STATE_CONNECTING || + existing->state == STATE_WAIT); + goto replace; + } else { + // our existing outgoing wins + ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", sending WAIT" << dendl; + ceph_assert(peer_addr > msgr->my_addr); + if (!(existing->state == STATE_CONNECTING)) + lderr(msgr->cct) << "accept race bad state, would send wait, existing=" + << existing->get_state_name() + << " " << existing << ".cseq=" << existing->connect_seq + << " == " << connect.connect_seq + << dendl; + ceph_assert(existing->state == STATE_CONNECTING); + // make sure our outgoing connection will follow through + existing->_send_keepalive(); + reply.tag = CEPH_MSGR_TAG_WAIT; + existing->pipe_lock.Unlock(); + msgr->lock.Unlock(); + goto reply; + } + } + + ceph_assert(connect.connect_seq > existing->connect_seq); + ceph_assert(connect.global_seq >= existing->peer_global_seq); + if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other + existing->connect_seq == 0) { + ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq + << ", " << existing << ".cseq = " << existing->connect_seq + << "), sending RESETSESSION" << dendl; + reply.tag = CEPH_MSGR_TAG_RESETSESSION; + msgr->lock.Unlock(); + existing->pipe_lock.Unlock(); + goto reply; + } + + // reconnect + ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq + << " > " << existing->connect_seq << dendl; + goto replace; + } // existing + else if (connect.connect_seq > 0) { + // we reset, and they are opening a new session + ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; + msgr->lock.Unlock(); + reply.tag = CEPH_MSGR_TAG_RESETSESSION; + goto reply; + } else { + // new session + ldout(msgr->cct,10) << "accept new session" << dendl; + existing = NULL; + goto open; + } + ceph_abort(); + + retry_session: + ceph_assert(existing->pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); + reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; + reply.connect_seq = existing->connect_seq + 1; + existing->pipe_lock.Unlock(); + msgr->lock.Unlock(); + goto reply; + + reply: + ceph_assert(pipe_lock.is_locked()); + reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; + reply.authorizer_len = authorizer_reply.length(); + pipe_lock.Unlock(); + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) + goto fail_unlocked; + if (reply.authorizer_len) { + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) + goto fail_unlocked; + } + } + + replace: + ceph_assert(existing->pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); + // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence + if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + existing_seq = existing->in_seq; + } + ldout(msgr->cct,10) << "accept replacing " << existing << dendl; + existing->stop(); + existing->unregister_pipe(); + replaced = true; + + if (existing->policy.lossy) { + // disconnect from the Connection + ceph_assert(existing->connection_state); + if (existing->connection_state->clear_pipe(existing)) + msgr->dispatch_queue.queue_reset(existing->connection_state.get()); + } else { + // queue a reset on the new connection, which we're dumping for the old + msgr->dispatch_queue.queue_reset(connection_state.get()); + + // drop my Connection, and take a ref to the existing one. do not + // clear existing->connection_state, since read_message and + // write_message both dereference it without pipe_lock. + connection_state = existing->connection_state; + + // make existing Connection reference us + connection_state->reset_pipe(this); + + if (existing->delay_thread) { + existing->delay_thread->steal_for_pipe(this); + delay_thread = existing->delay_thread; + existing->delay_thread = NULL; + delay_thread->flush(); + } + + // steal incoming queue + uint64_t replaced_conn_id = conn_id; + conn_id = existing->conn_id; + existing->conn_id = replaced_conn_id; + + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + in_seq = is_reset_from_peer ? 0 : existing->in_seq; + in_seq_acked = in_seq; + + // steal outgoing queue and out_seq + existing->requeue_sent(); + out_seq = existing->out_seq; + ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl; + for (map<int, list<Message*> >::iterator p = existing->out_q.begin(); + p != existing->out_q.end(); + ++p) + out_q[p->first].splice(out_q[p->first].begin(), p->second); + } + existing->stop_and_wait(); + existing->pipe_lock.Unlock(); + + open: + // open + ceph_assert(pipe_lock.is_locked()); + connect_seq = connect.connect_seq + 1; + peer_global_seq = connect.global_seq; + ceph_assert(state == STATE_ACCEPTING); + state = STATE_OPEN; + ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; + + // send READY reply + reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); + reply.features = policy.features_supported; + reply.global_seq = msgr->get_global_seq(); + reply.connect_seq = connect_seq; + reply.flags = 0; + reply.authorizer_len = authorizer_reply.length(); + if (policy.lossy) + reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; + + connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); + ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl; + + session_security.reset( + get_auth_session_handler(msgr->cct, + connect.authorizer_protocol, + session_key, + connection_state->get_features())); + + // notify + msgr->dispatch_queue.queue_accept(connection_state.get()); + msgr->ms_deliver_handle_fast_accept(connection_state.get()); + + // ok! + if (msgr->dispatch_queue.stop) + goto shutting_down; + removed = msgr->accepting_pipes.erase(this); + ceph_assert(removed == 1); + register_pipe(); + msgr->lock.Unlock(); + pipe_lock.Unlock(); + + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) { + goto fail_registered; + } + + if (reply.authorizer_len) { + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) { + goto fail_registered; + } + } + + if (reply_tag == CEPH_MSGR_TAG_SEQ) { + if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { + ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; + goto fail_registered; + } + if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { + ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; + goto fail_registered; + } + } + + pipe_lock.Lock(); + discard_requeued_up_to(newly_acked_seq); + if (state != STATE_CLOSED) { + ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl; + start_writer(); + } + ldout(msgr->cct,20) << "accept done" << dendl; + + maybe_start_delay_thread(); + + return 0; // success. + + fail_registered: + ldout(msgr->cct, 10) << "accept fault after register" << dendl; + + if (msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + fail_unlocked: + pipe_lock.Lock(); + if (state != STATE_CLOSED) { + bool queued = is_queued(); + ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl; + if (queued) { + state = policy.server ? STATE_STANDBY : STATE_CONNECTING; + } else if (replaced) { + state = STATE_STANDBY; + } else { + state = STATE_CLOSED; + state_closed = true; + } + fault(); + if (queued || replaced) + start_writer(); + } + return -1; + + shutting_down: + msgr->lock.Unlock(); + shutting_down_msgr_unlocked: + ceph_assert(pipe_lock.is_locked()); + + if (msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + state = STATE_CLOSED; + state_closed = true; + fault(); + return -1; +} + +void Pipe::set_socket_options() +{ + // disable Nagle algorithm? + if (msgr->cct->_conf->ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) { + r = -errno; + ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: " + << cpp_strerror(r) << dendl; + } + } + if (msgr->cct->_conf->ms_tcp_rcvbuf) { + int size = msgr->cct->_conf->ms_tcp_rcvbuf; + int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); + if (r < 0) { + r = -errno; + ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size + << ": " << cpp_strerror(r) << dendl; + } + } + + // block ESIGPIPE +#ifdef CEPH_USE_SO_NOSIGPIPE + int val = 1; + int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); + if (r) { + r = -errno; + ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: " + << cpp_strerror(r) << dendl; + } +#endif + +#ifdef SO_PRIORITY + int prio = msgr->get_socket_priority(); + if (prio >= 0) { + int r = -1; +#ifdef IPTOS_CLASS_CS6 + int iptos = IPTOS_CLASS_CS6; + int addr_family = 0; + if (!peer_addr.is_blank_ip()) { + addr_family = peer_addr.get_family(); + } else { + addr_family = msgr->get_myaddr_legacy().get_family(); + } + switch (addr_family) { + case AF_INET: + r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos)); + break; + case AF_INET6: + r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos)); + break; + default: + lderr(msgr->cct) << "couldn't set ToS of unknown family (" + << addr_family << ")" + << " to " << iptos << dendl; + return; + } + if (r < 0) { + r = -errno; + ldout(msgr->cct,0) << "couldn't set TOS to " << iptos + << ": " << cpp_strerror(r) << dendl; + } +#endif + // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0. + // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT + // We need to call setsockopt(SO_PRIORITY) after it. + r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio)); + if (r < 0) { + r = -errno; + ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio + << ": " << cpp_strerror(r) << dendl; + } + } +#endif +} + +int Pipe::connect() +{ + ldout(msgr->cct,10) << "connect " << connect_seq << dendl; + ceph_assert(pipe_lock.is_locked()); + + __u32 cseq = connect_seq; + __u32 gseq = msgr->get_global_seq(); + + // stop reader thread + join_reader(); + + pipe_lock.Unlock(); + + char tag = -1; + int rc = -1; + struct msghdr msg; + struct iovec msgvec[2]; + int msglen; + char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy + entity_addr_t paddr; + entity_addr_t peer_addr_for_me, socket_addr; + AuthAuthorizer *authorizer = NULL; + bufferlist addrbl, myaddrbl; + const auto& conf = msgr->cct->_conf; + + // close old socket. this is safe because we stopped the reader thread above. + if (sd >= 0) + ::close(sd); + + // create socket? + sd = socket_cloexec(peer_addr.get_family(), SOCK_STREAM, 0); + if (sd < 0) { + int e = errno; + lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(e) << dendl; + rc = -e; + goto fail; + } + + recv_reset(); + + set_socket_options(); + + { + entity_addr_t addr2bind = msgr->get_myaddr_legacy(); + if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) { + addr2bind.set_port(0); + int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len()); + if (r < 0) { + ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl; + goto fail; + } + } + } + + // connect! + ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; + rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len()); + if (rc < 0) { + int stored_errno = errno; + ldout(msgr->cct,2) << "connect error " << peer_addr + << ", " << cpp_strerror(stored_errno) << dendl; + if (stored_errno == ECONNREFUSED) { + ldout(msgr->cct, 2) << "connection refused!" << dendl; + msgr->dispatch_queue.queue_refused(connection_state.get()); + } + goto fail; + } + + // verify banner + // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. + rc = tcp_read((char*)&banner, strlen(CEPH_BANNER)); + if (rc < 0) { + ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl; + goto fail; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl; + goto fail; + } + + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = banner; + msgvec[0].iov_len = strlen(CEPH_BANNER); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + rc = do_sendmsg(&msg, msglen); + if (rc < 0) { + ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl; + goto fail; + } + + // identify peer + { +#if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__) + bufferptr p(sizeof(ceph_entity_addr) * 2); +#else + int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage); + bufferptr p(wirelen * 2); +#endif + addrbl.push_back(std::move(p)); + } + rc = tcp_read(addrbl.c_str(), addrbl.length()); + if (rc < 0) { + ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl; + goto fail; + } + try { + auto p = addrbl.cbegin(); + decode(paddr, p); + decode(peer_addr_for_me, p); + } + catch (buffer::error& e) { + ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what() + << dendl; + goto fail; + } + port = peer_addr_for_me.get_port(); + + ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; + if (peer_addr != paddr) { + if (paddr.is_blank_ip() && + peer_addr.get_port() == paddr.get_port() && + peer_addr.get_nonce() == paddr.get_nonce()) { + ldout(msgr->cct,0) << "connect claims to be " + << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; + } else { + ldout(msgr->cct,10) << "connect claims to be " + << paddr << " not " << peer_addr << dendl; + goto fail; + } + } + + ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl; + + msgr->learned_addr(peer_addr_for_me); + + encode(msgr->my_addr, myaddrbl, 0); // legacy + + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = myaddrbl.c_str(); + msgvec[0].iov_len = myaddrbl.length(); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + rc = do_sendmsg(&msg, msglen); + if (rc < 0) { + ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl; + goto fail; + } + ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_addr << dendl; + + + while (1) { + if (!authorizer) { + authorizer = msgr->ms_deliver_get_authorizer(peer_type); + } + bufferlist authorizer_reply; + + ceph_msg_connect connect; + connect.features = policy.features_supported; + connect.host_type = msgr->get_myname().type(); + connect.global_seq = gseq; + connect.connect_seq = cseq; + connect.protocol_version = msgr->get_proto_version(peer_type, true); + connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; + connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; + if (authorizer) + ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len + << " protocol=" << connect.authorizer_protocol << dendl; + connect.flags = 0; + if (policy.lossy) + connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&connect; + msgvec[0].iov_len = sizeof(connect); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + if (authorizer) { + msgvec[1].iov_base = authorizer->bl.c_str(); + msgvec[1].iov_len = authorizer->bl.length(); + msg.msg_iovlen++; + msglen += msgvec[1].iov_len; + } + + ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq + << " proto=" << connect.protocol_version << dendl; + rc = do_sendmsg(&msg, msglen); + if (rc < 0) { + ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl; + goto fail; + } + + ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl; + ceph_msg_connect_reply reply; + rc = tcp_read((char*)&reply, sizeof(reply)); + if (rc < 0) { + ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl; + goto fail; + } + + ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag + << " connect_seq " << reply.connect_seq + << " global_seq " << reply.global_seq + << " proto " << reply.protocol_version + << " flags " << (int)reply.flags + << " features " << reply.features + << dendl; + + authorizer_reply.clear(); + + if (reply.authorizer_len) { + ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; + bufferptr bp = buffer::create(reply.authorizer_len); + rc = tcp_read(bp.c_str(), reply.authorizer_len); + if (rc < 0) { + ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl; + goto fail; + } + authorizer_reply.push_back(bp); + } + + if (reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { + authorizer->add_challenge(msgr->cct, authorizer_reply); + ldout(msgr->cct,10) << " got authorizer challenge, " << authorizer_reply.length() + << " bytes" << dendl; + continue; + } + + if (authorizer) { + auto iter = authorizer_reply.cbegin(); + if (!authorizer->verify_reply(iter, nullptr /* connection_secret */)) { + ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl; + goto fail; + } + } + + if (conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + pipe_lock.Lock(); + if (state != STATE_CONNECTING) { + ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl; + goto stop_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_FEATURES) { + ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex + << connect.features << " < peer " << reply.features + << " missing " << (reply.features & ~policy.features_supported) + << std::dec << dendl; + goto fail_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { + ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version + << " != " << reply.protocol_version << dendl; + goto fail_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { + ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl; + goto fail_locked; + } + if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { + ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl; + was_session_reset(); + cseq = 0; + pipe_lock.Unlock(); + continue; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + gseq = msgr->get_global_seq(reply.global_seq); + ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq + << " chose new " << gseq << dendl; + pipe_lock.Unlock(); + continue; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { + ceph_assert(reply.connect_seq > connect_seq); + ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq + << " -> " << reply.connect_seq << dendl; + cseq = connect_seq = reply.connect_seq; + pipe_lock.Unlock(); + continue; + } + + if (reply.tag == CEPH_MSGR_TAG_WAIT) { + ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl; + state = STATE_WAIT; + goto stop_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_READY || + reply.tag == CEPH_MSGR_TAG_SEQ) { + uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features; + if (feat_missing) { + ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; + goto fail_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_SEQ) { + ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; + uint64_t newly_acked_seq = 0; + rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)); + if (rc < 0) { + ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl; + goto fail_locked; + } + ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; + while (newly_acked_seq > out_seq) { + Message *m = _get_next_outgoing(); + ceph_assert(m); + ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq() + << " " << *m << dendl; + ceph_assert(m->get_seq() <= newly_acked_seq); + m->put(); + ++out_seq; + } + if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) { + ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; + goto fail_locked; + } + } + + // hooray! + peer_global_seq = reply.global_seq; + policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY; + state = STATE_OPEN; + connect_seq = cseq + 1; + ceph_assert(connect_seq == reply.connect_seq); + backoff = utime_t(); + connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); + ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy + << ", features " << connection_state->get_features() << dendl; + + + // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the + // connection. PLR + + if (authorizer != NULL) { + session_security.reset( + get_auth_session_handler( + msgr->cct, + authorizer->protocol, + authorizer->session_key, + connection_state->get_features())); + } else { + // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR + session_security.reset(); + } + + msgr->dispatch_queue.queue_connect(connection_state.get()); + msgr->ms_deliver_handle_fast_connect(connection_state.get()); + + if (!reader_running) { + ldout(msgr->cct,20) << "connect starting reader" << dendl; + start_reader(); + } + maybe_start_delay_thread(); + delete authorizer; + return 0; + } + + // protocol error + ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl; + goto fail_locked; + } + + fail: + if (conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + pipe_lock.Lock(); + fail_locked: + if (state == STATE_CONNECTING) + fault(); + else + ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name() + << " != connecting, stopping" << dendl; + + stop_locked: + delete authorizer; + return rc; +} + +void Pipe::register_pipe() +{ + ldout(msgr->cct,10) << "register_pipe" << dendl; + ceph_assert(msgr->lock.is_locked()); + Pipe *existing = msgr->_lookup_pipe(peer_addr); + ceph_assert(existing == NULL); + msgr->rank_pipe[peer_addr] = this; +} + +void Pipe::unregister_pipe() +{ + ceph_assert(msgr->lock.is_locked()); + ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr); + if (p != msgr->rank_pipe.end() && p->second == this) { + ldout(msgr->cct,10) << "unregister_pipe" << dendl; + msgr->rank_pipe.erase(p); + } else { + ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; + msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. + } +} + +void Pipe::join() +{ + ldout(msgr->cct, 20) << "join" << dendl; + if (writer_thread.is_started()) + writer_thread.join(); + if (reader_thread.is_started()) + reader_thread.join(); + if (delay_thread) { + ldout(msgr->cct, 20) << "joining delay_thread" << dendl; + delay_thread->stop(); + delay_thread->join(); + } +} + +void Pipe::requeue_sent() +{ + if (sent.empty()) + return; + + list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!sent.empty()) { + Message *m = sent.back(); + sent.pop_back(); + ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(m); + out_seq--; + } +} + +void Pipe::discard_requeued_up_to(uint64_t seq) +{ + ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) { + out_seq = seq; + return; + } + list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!rq.empty()) { + Message *m = rq.front(); + if (m->get_seq() == 0 || m->get_seq() > seq) + break; + ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq + << " <= " << seq << ", discarding" << dendl; + m->put(); + rq.pop_front(); + out_seq++; + } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); +} + +/* + * Tears down the Pipe's message queues, and removes them from the DispatchQueue + * Must hold pipe_lock prior to calling. + */ +void Pipe::discard_out_queue() +{ + ldout(msgr->cct,10) << "discard_queue" << dendl; + + for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(msgr->cct,20) << " discard " << *p << dendl; + (*p)->put(); + } + sent.clear(); + for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p) + for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) { + ldout(msgr->cct,20) << " discard " << *r << dendl; + (*r)->put(); + } + out_q.clear(); +} + +void Pipe::fault(bool onread) +{ + const auto& conf = msgr->cct->_conf; + ceph_assert(pipe_lock.is_locked()); + cond.Signal(); + + if (onread && state == STATE_CONNECTING) { + ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl; + return; + } + + ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl; + + if (state == STATE_CLOSED || + state == STATE_CLOSING) { + ldout(msgr->cct,10) << "fault already closed|closing" << dendl; + if (connection_state->clear_pipe(this)) + msgr->dispatch_queue.queue_reset(connection_state.get()); + return; + } + + shutdown_socket(); + + // lossy channel? + if (policy.lossy && state != STATE_CONNECTING) { + ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl; + + // disconnect from Connection, and mark it failed. future messages + // will be dropped. + ceph_assert(connection_state); + stop(); + bool cleared = connection_state->clear_pipe(this); + + // crib locks, blech. note that Pipe is now STATE_CLOSED and the + // rank_pipe entry is ignored by others. + pipe_lock.Unlock(); + + if (conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + msgr->lock.Lock(); + pipe_lock.Lock(); + unregister_pipe(); + msgr->lock.Unlock(); + + if (delay_thread) + delay_thread->discard(); + in_q->discard_queue(conn_id); + discard_out_queue(); + if (cleared) + msgr->dispatch_queue.queue_reset(connection_state.get()); + return; + } + + // queue delayed items immediately + if (delay_thread) + delay_thread->flush(); + + // requeue sent items + requeue_sent(); + + if (policy.standby && !is_queued()) { + ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; + state = STATE_STANDBY; + return; + } + + if (state != STATE_CONNECTING) { + if (policy.server) { + ldout(msgr->cct,0) << "fault, server, going to standby" << dendl; + state = STATE_STANDBY; + } else { + ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl; + connect_seq++; + state = STATE_CONNECTING; + } + backoff = utime_t(); + } else if (backoff == utime_t()) { + ldout(msgr->cct,0) << "fault" << dendl; + backoff.set_from_double(conf->ms_initial_backoff); + } else { + ldout(msgr->cct,10) << "fault waiting " << backoff << dendl; + cond.WaitInterval(pipe_lock, backoff); + backoff += backoff; + if (backoff > conf->ms_max_backoff) + backoff.set_from_double(conf->ms_max_backoff); + ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl; + } +} + +void Pipe::randomize_out_seq() +{ + if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) { + // Set out_seq to a random value, so CRC won't be predictable. + out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK); + lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl; + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + } +} + +void Pipe::was_session_reset() +{ + ceph_assert(pipe_lock.is_locked()); + + ldout(msgr->cct,10) << "was_session_reset" << dendl; + in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); + discard_out_queue(); + + msgr->dispatch_queue.queue_remote_reset(connection_state.get()); + + randomize_out_seq(); + + in_seq = 0; + in_seq_acked = 0; + connect_seq = 0; +} + +void Pipe::stop() +{ + ldout(msgr->cct,10) << "stop" << dendl; + ceph_assert(pipe_lock.is_locked()); + state = STATE_CLOSED; + state_closed = true; + cond.Signal(); + shutdown_socket(); +} + +void Pipe::stop_and_wait() +{ + ceph_assert(pipe_lock.is_locked_by_me()); + if (state != STATE_CLOSED) + stop(); + + if (msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << msgr->cct->_conf->ms_inject_internal_delays + << dendl; + utime_t t; + t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + + if (delay_thread) { + pipe_lock.Unlock(); + delay_thread->stop_fast_dispatching(); + pipe_lock.Lock(); + } + while (reader_running && + reader_dispatching) + cond.Wait(pipe_lock); +} + +/* read msgs from socket. + * also, server. + */ +void Pipe::reader() +{ + pipe_lock.Lock(); + + if (state == STATE_ACCEPTING) { + accept(); + ceph_assert(pipe_lock.is_locked()); + } + + // loop. + while (state != STATE_CLOSED && + state != STATE_CONNECTING) { + ceph_assert(pipe_lock.is_locked()); + + // sleep if (re)connecting + if (state == STATE_STANDBY) { + ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl; + cond.Wait(pipe_lock); + continue; + } + + // get a reference to the AuthSessionHandler while we have the pipe_lock + std::shared_ptr<AuthSessionHandler> auth_handler = session_security; + + pipe_lock.Unlock(); + + char tag = -1; + ldout(msgr->cct,20) << "reader reading tag..." << dendl; + if (tcp_read((char*)&tag, 1) < 0) { + pipe_lock.Lock(); + ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl; + fault(true); + continue; + } + + if (tag == CEPH_MSGR_TAG_KEEPALIVE) { + ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl; + pipe_lock.Lock(); + connection_state->set_last_keepalive(ceph_clock_now()); + continue; + } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { + ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl; + ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " + << cpp_strerror(errno) << dendl; + fault(true); + } else { + send_keepalive_ack = true; + keepalive_ack_stamp = utime_t(t); + ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp + << dendl; + connection_state->set_last_keepalive(ceph_clock_now()); + cond.Signal(); + } + continue; + } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl; + struct ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; + fault(true); + } else { + connection_state->set_last_keepalive_ack(utime_t(t)); + } + continue; + } + + // open ... + if (tag == CEPH_MSGR_TAG_ACK) { + ldout(msgr->cct,20) << "reader got ACK" << dendl; + ceph_le64 seq; + int rc = tcp_read((char*)&seq, sizeof(seq)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl; + fault(true); + } else if (state != STATE_CLOSED) { + handle_ack(seq); + } + continue; + } + + else if (tag == CEPH_MSGR_TAG_MSG) { + ldout(msgr->cct,20) << "reader got MSG" << dendl; + Message *m = 0; + int r = read_message(&m, auth_handler.get()); + + pipe_lock.Lock(); + + if (!m) { + if (r < 0) + fault(true); + continue; + } + + m->trace.event("pipe read message"); + + if (state == STATE_CLOSED || + state == STATE_CONNECTING) { + in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + continue; + } + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the client + // side queueing because messages can't be renumbered, but the (kernel) client will + // occasionally pull a message out of the sent queue to send elsewhere. in that case + // it doesn't matter if we "got" it or not. + if (m->get_seq() <= in_seq) { + ldout(msgr->cct,0) << "reader got old message " + << m->get_seq() << " <= " << in_seq << " " << m << " " << *m + << ", discarding" << dendl; + in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && + msgr->cct->_conf->ms_die_on_old_message) + ceph_abort_msg("old msgs despite reconnect_seq feature"); + continue; + } + if (m->get_seq() > in_seq + 1) { + ldout(msgr->cct,0) << "reader missed message? skipped from seq " + << in_seq << " to " << m->get_seq() << dendl; + if (msgr->cct->_conf->ms_die_on_skipped_message) + ceph_abort_msg("skipped incoming seq"); + } + + m->set_connection(connection_state.get()); + + // note last received message. + in_seq = m->get_seq(); + + cond.Signal(); // wake up writer, to ack this + + ldout(msgr->cct,10) << "reader got message " + << m->get_seq() << " " << m << " " << *m + << dendl; + in_q->fast_preprocess(m); + + if (delay_thread) { + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); + } else { + if (in_q->can_fast_dispatch(m)) { + reader_dispatching = true; + pipe_lock.Unlock(); + in_q->fast_dispatch(m); + pipe_lock.Lock(); + reader_dispatching = false; + if (state == STATE_CLOSED || + notify_on_dispatch_done) { // there might be somebody waiting + notify_on_dispatch_done = false; + cond.Signal(); + } + } else { + in_q->enqueue(m, m->get_priority(), conn_id); + } + } + } + + else if (tag == CEPH_MSGR_TAG_CLOSE) { + ldout(msgr->cct,20) << "reader got CLOSE" << dendl; + pipe_lock.Lock(); + if (state == STATE_CLOSING) { + state = STATE_CLOSED; + state_closed = true; + } else { + state = STATE_CLOSING; + } + cond.Signal(); + break; + } + else { + ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl; + pipe_lock.Lock(); + fault(true); + } + } + + + // reap? + reader_running = false; + reader_needs_join = true; + unlock_maybe_reap(); + ldout(msgr->cct,10) << "reader done" << dendl; +} + +/* write msgs to socket. + * also, client. + */ +void Pipe::writer() +{ + pipe_lock.Lock(); + while (state != STATE_CLOSED) {// && state != STATE_WAIT) { + ldout(msgr->cct,10) << "writer: state = " << get_state_name() + << " policy.server=" << policy.server << dendl; + + // standby? + if (is_queued() && state == STATE_STANDBY && !policy.server) + state = STATE_CONNECTING; + + // connect? + if (state == STATE_CONNECTING) { + ceph_assert(!policy.server); + connect(); + continue; + } + + if (state == STATE_CLOSING) { + // write close tag + ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl; + char tag = CEPH_MSGR_TAG_CLOSE; + state = STATE_CLOSED; + state_closed = true; + pipe_lock.Unlock(); + if (sd >= 0) { + // we can ignore return value, actually; we don't care if this succeeds. + int r = ::write(sd, &tag, 1); + (void)r; + } + pipe_lock.Lock(); + continue; + } + + if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && + (is_queued() || in_seq > in_seq_acked)) { + + // keepalive? + if (send_keepalive) { + int rc; + if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + pipe_lock.Unlock(); + rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2, + ceph_clock_now()); + } else { + pipe_lock.Unlock(); + rc = write_keepalive(); + } + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "writer couldn't write keepalive[2], " + << cpp_strerror(errno) << dendl; + fault(); + continue; + } + send_keepalive = false; + } + if (send_keepalive_ack) { + utime_t t = keepalive_ack_stamp; + pipe_lock.Unlock(); + int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl; + fault(); + continue; + } + send_keepalive_ack = false; + } + + // send ack? + if (in_seq > in_seq_acked) { + uint64_t send_seq = in_seq; + pipe_lock.Unlock(); + int rc = write_ack(send_seq); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl; + fault(); + continue; + } + in_seq_acked = send_seq; + } + + // grab outgoing message + Message *m = _get_next_outgoing(); + if (m) { + m->set_seq(++out_seq); + if (!policy.lossy) { + // put on sent list + sent.push_back(m); + m->get(); + } + + // associate message with Connection (for benefit of encode_payload) + m->set_connection(connection_state.get()); + + uint64_t features = connection_state->get_features(); + + if (m->empty_payload()) + ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; + else + ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; + + // encode and copy out of *m + m->encode(features, msgr->crcflags); + + // prepare everything + const ceph_msg_header& header = m->get_header(); + const ceph_msg_footer& footer = m->get_footer(); + + // Now that we have all the crcs calculated, handle the + // digital signature for the message, if the pipe has session + // security set up. Some session security options do not + // actually calculate and check the signature, but they should + // handle the calls to sign_message and check_signature. PLR + if (session_security.get() == NULL) { + ldout(msgr->cct, 20) << "writer no session security" << dendl; + } else { + if (session_security->sign_message(m)) { + ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq + << "): sig = " << footer.sig << dendl; + } else { + ldout(msgr->cct, 20) << "writer signed seq # " << header.seq + << "): sig = " << footer.sig << dendl; + } + } + + bufferlist blist = m->get_payload(); + blist.append(m->get_middle()); + blist.append(m->get_data()); + + pipe_lock.Unlock(); + + m->trace.event("pipe writing message"); + + ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl; + int rc = write_message(header, footer, blist); + + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,1) << "writer error sending " << m << ", " + << cpp_strerror(errno) << dendl; + fault(); + } + m->put(); + } + continue; + } + + // wait + ldout(msgr->cct,20) << "writer sleeping" << dendl; + cond.Wait(pipe_lock); + } + + ldout(msgr->cct,20) << "writer finishing" << dendl; + + // reap? + writer_running = false; + unlock_maybe_reap(); + ldout(msgr->cct,10) << "writer done" << dendl; +} + +void Pipe::unlock_maybe_reap() +{ + if (!reader_running && !writer_running) { + shutdown_socket(); + pipe_lock.Unlock(); + if (delay_thread && delay_thread->is_flushing()) { + delay_thread->wait_for_flush(); + } + msgr->queue_reap(this); + } else { + pipe_lock.Unlock(); + } +} + +static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) +{ + // create a buffer to read into that matches the data alignment + unsigned left = len; + if (off & ~CEPH_PAGE_MASK) { + // head + unsigned head = 0; + head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); + data.push_back(buffer::create(head)); + left -= head; + } + unsigned middle = left & CEPH_PAGE_MASK; + if (middle > 0) { + data.push_back(buffer::create_small_page_aligned(middle)); + left -= middle; + } + if (left) { + data.push_back(buffer::create(left)); + } +} + +int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) +{ + int ret = -1; + // envelope + //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl; + + ceph_msg_header header; + ceph_msg_footer footer; + __u32 header_crc = 0; + + if (tcp_read((char*)&header, sizeof(header)) < 0) + return -1; + if (msgr->crcflags & MSG_CRC_HEADER) { + header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + } + + ldout(msgr->cct,20) << "reader got envelope type=" << header.type + << " src " << entity_name_t(header.src) + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off + << dendl; + + // verify header crc + if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) { + ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; + return -1; + } + + bufferlist front, middle, data; + int front_len, middle_len; + unsigned data_len, data_off; + int aborted; + Message *message; + utime_t recv_stamp = ceph_clock_now(); + + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->get(); + } + + uint64_t message_size = header.front_len + header.middle_len + header.data_len; + if (message_size) { + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->get(message_size); + } + + // throttle total bytes waiting for dispatch. do this _after_ the + // policy throttle, as this one does not deadlock (unless dispatch + // blocks indefinitely, which it shouldn't). in contrast, the + // policy throttle carries for the lifetime of the message. + ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " + << in_q->dispatch_throttler.get_current() << "/" + << in_q->dispatch_throttler.get_max() << dendl; + in_q->dispatch_throttler.get(message_size); + } + + utime_t throttle_stamp = ceph_clock_now(); + + // read front + front_len = header.front_len; + if (front_len) { + bufferptr bp = buffer::create(front_len); + if (tcp_read(bp.c_str(), front_len) < 0) + goto out_dethrottle; + front.push_back(std::move(bp)); + ldout(msgr->cct,20) << "reader got front " << front.length() << dendl; + } + + // read middle + middle_len = header.middle_len; + if (middle_len) { + bufferptr bp = buffer::create(middle_len); + if (tcp_read(bp.c_str(), middle_len) < 0) + goto out_dethrottle; + middle.push_back(std::move(bp)); + ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl; + } + + + // read data + data_len = le32_to_cpu(header.data_len); + data_off = le32_to_cpu(header.data_off); + if (data_len) { + unsigned offset = 0; + unsigned left = data_len; + + bufferlist newbuf, rxbuf; + bufferlist::iterator blp; +// int rxbuf_version = 0; + + while (left > 0) { + // wait for data + if (tcp_read_wait() < 0) + goto out_dethrottle; + + // get a buffer +#if 0 + // The rx_buffers implementation is buggy: + // - see http://tracker.ceph.com/issues/22480 + // + // - From inspection, I think that we have problems if we read *part* + // of the message into an rx_buffer, then drop the lock, someone revokes, + // and then later try to read the rest. In that case our final bufferlist + // will have part of the original static_buffer from the first chunk and + // partly a piece that we allocated. I think that to make this correct, + // we need to keep the bufferlist we are reading into in Connection under + // the lock, and on revoke, if the data is partly read, rebuild() to copy + // into fresh buffers so that all references to our static buffer are + // cleared up. + // + // - Also... what happens if we fully read into the static + // buffer, then revoke? We still have some bufferlist out there + // in the process of getting dispatched back to objecter or + // librados that references the static buffer. + connection_state->lock.Lock(); + map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid); + if (p != connection_state->rx_buffers.end()) { + if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { + ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second + << " at offset " << offset + << " len " << p->second.first.length() << dendl; + rxbuf = p->second.first; + rxbuf_version = p->second.second; + // make sure it's big enough + if (rxbuf.length() < data_len) + rxbuf.push_back(buffer::create(data_len - rxbuf.length())); + blp = p->second.first.begin(); + blp.advance(offset); + } + } else { + if (!newbuf.length()) { + ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl; + alloc_aligned_buffer(newbuf, data_len, data_off); + blp = newbuf.begin(); + blp.advance(offset); + } + } + bufferptr bp = blp.get_current_ptr(); + int read = std::min(bp.length(), left); + ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; + ssize_t got = tcp_read_nonblocking(bp.c_str(), read); + ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; + connection_state->lock.Unlock(); +#else + // rx_buffer-less implementation + if (!newbuf.length()) { + ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " + << offset << dendl; + alloc_aligned_buffer(newbuf, data_len, data_off); + blp = newbuf.begin(); + blp.advance(offset); + } + bufferptr bp = blp.get_current_ptr(); + int read = std::min(bp.length(), left); + ldout(msgr->cct,20) << "reader reading nonblocking into " + << (void*)bp.c_str() << " len " << bp.length() + << dendl; + ssize_t got = tcp_read_nonblocking(bp.c_str(), read); + ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; +#endif + if (got < 0) + goto out_dethrottle; + if (got > 0) { + blp.advance(static_cast<size_t>(got)); + data.append(bp, 0, got); + offset += got; + left -= got; + } // else we got a signal or something; just loop. + } + } + + // footer + if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { + if (tcp_read((char*)&footer, sizeof(footer)) < 0) + goto out_dethrottle; + } else { + ceph_msg_footer_old old_footer; + if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0) + goto out_dethrottle; + footer.front_crc = old_footer.front_crc; + footer.middle_crc = old_footer.middle_crc; + footer.data_crc = old_footer.data_crc; + footer.sig = 0; + footer.flags = old_footer.flags; + } + + aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; + ldout(msgr->cct,10) << "aborted = " << aborted << dendl; + if (aborted) { + ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() + << " byte message.. ABORTED" << dendl; + ret = 0; + goto out_dethrottle; + } + + ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() + << " byte message" << dendl; + message = decode_message(msgr->cct, msgr->crcflags, header, footer, + front, middle, data, connection_state.get()); + if (!message) { + ret = -EINVAL; + goto out_dethrottle; + } + + // + // Check the signature if one should be present. A zero return indicates success. PLR + // + + if (auth_handler == NULL) { + ldout(msgr->cct, 10) << "No session security set" << dendl; + } else { + if (auth_handler->check_message_signature(message)) { + ldout(msgr->cct, 0) << "Signature check failed" << dendl; + message->put(); + ret = -EINVAL; + goto out_dethrottle; + } + } + + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(message_size); + + message->set_recv_stamp(recv_stamp); + message->set_throttle_stamp(throttle_stamp); + message->set_recv_complete_stamp(ceph_clock_now()); + + *pm = message; + return 0; + + out_dethrottle: + // release bytes reserved from the throttlers on failure + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } + if (message_size) { + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(message_size); + } + + in_q->dispatch_throttle_release(message_size); + } + return ret; +} + +int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more) +{ + MSGR_SIGPIPE_STOPPER; + while (len > 0) { + int r; + r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + if (r == 0) + ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; + if (r < 0) { + r = -errno; + ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl; + return r; + } + if (state == STATE_CLOSED) { + ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; + return -EINTR; // close enough + } + + len -= r; + if (len == 0) break; + + // hrmph. trim r bytes off the front of our message. + ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl; + while (r > 0) { + if (msg->msg_iov[0].iov_len <= (size_t)r) { + // lose this whole item + //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; + r -= msg->msg_iov[0].iov_len; + msg->msg_iov++; + msg->msg_iovlen--; + } else { + // partial! + //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; + msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r; + msg->msg_iov[0].iov_len -= r; + break; + } + } + } + return 0; +} + + +int Pipe::write_ack(uint64_t seq) +{ + ldout(msgr->cct,10) << "write_ack " << seq << dendl; + + char c = CEPH_MSGR_TAG_ACK; + ceph_le64 s; + s = seq; + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &c; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &s; + msgvec[1].iov_len = sizeof(s); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0) + return -1; + return 0; +} + +int Pipe::write_keepalive() +{ + ldout(msgr->cct,10) << "write_keepalive" << dendl; + + char c = CEPH_MSGR_TAG_KEEPALIVE; + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &c; + msgvec[0].iov_len = 1; + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + + if (do_sendmsg(&msg, 1) < 0) + return -1; + return 0; +} + +int Pipe::write_keepalive2(char tag, const utime_t& t) +{ + ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl; + struct ceph_timespec ts; + t.encode_timeval(&ts); + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &tag; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &ts; + msgvec[1].iov_len = sizeof(ts); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0) + return -1; + return 0; +} + + +int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist) +{ + int ret; + + // set up msghdr and iovecs + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = msgvec; + int msglen = 0; + + // send tag + char tag = CEPH_MSGR_TAG_MSG; + msgvec[msg.msg_iovlen].iov_base = &tag; + msgvec[msg.msg_iovlen].iov_len = 1; + msglen++; + msg.msg_iovlen++; + + // send envelope + msgvec[msg.msg_iovlen].iov_base = (char*)&header; + msgvec[msg.msg_iovlen].iov_len = sizeof(header); + msglen += sizeof(header); + msg.msg_iovlen++; + + // payload (front+data) + auto pb = std::cbegin(blist.buffers()); + unsigned b_off = 0; // carry-over buffer offset, if any + unsigned bl_pos = 0; // blist pos + unsigned left = blist.length(); + + while (left > 0) { + unsigned donow = std::min(left, pb->length()-b_off); + if (donow == 0) { + ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() + << " b_off " << b_off << dendl; + } + ceph_assert(donow > 0); + ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off + << " leftinchunk " << left + << " buffer len " << pb->length() + << " writing " << donow + << dendl; + + if (msg.msg_iovlen >= SM_IOV_MAX-2) { + if (do_sendmsg(&msg, msglen, true)) + goto fail; + + // and restart the iov + msg.msg_iov = msgvec; + msg.msg_iovlen = 0; + msglen = 0; + } + + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); + msgvec[msg.msg_iovlen].iov_len = donow; + msglen += donow; + msg.msg_iovlen++; + + ceph_assert(left >= donow); + left -= donow; + b_off += donow; + bl_pos += donow; + if (left == 0) + break; + while (b_off == pb->length()) { + ++pb; + b_off = 0; + } + } + ceph_assert(left == 0); + + // send footer; if receiver doesn't support signatures, use the old footer format + + ceph_msg_footer_old old_footer; + if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { + msgvec[msg.msg_iovlen].iov_base = (void*)&footer; + msgvec[msg.msg_iovlen].iov_len = sizeof(footer); + msglen += sizeof(footer); + msg.msg_iovlen++; + } else { + if (msgr->crcflags & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; + old_footer.flags = footer.flags; + msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer; + msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer); + msglen += sizeof(old_footer); + msg.msg_iovlen++; + } + + // send + if (do_sendmsg(&msg, msglen)) + goto fail; + + ret = 0; + + out: + return ret; + + fail: + ret = -1; + goto out; +} + + +int Pipe::tcp_read(char *buf, unsigned len) +{ + if (sd < 0) + return -EINVAL; + + while (len > 0) { + + if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(msgr->cct, 0) << "injecting socket failure" << dendl; + ::shutdown(sd, SHUT_RDWR); + } + } + + if (tcp_read_wait() < 0) + return -1; + + ssize_t got = tcp_read_nonblocking(buf, len); + + if (got < 0) + return -1; + + len -= got; + buf += got; + //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl; + } + return 0; +} + +int Pipe::tcp_read_wait() +{ + if (sd < 0) + return -EINVAL; + struct pollfd pfd; + short evmask; + pfd.fd = sd; + pfd.events = POLLIN; +#if defined(__linux__) + pfd.events |= POLLRDHUP; +#endif + + if (has_pending_data()) + return 0; + + int r = poll(&pfd, 1, msgr->timeout); + if (r < 0) + return -errno; + if (r == 0) + return -EAGAIN; + + evmask = POLLERR | POLLHUP | POLLNVAL; +#if defined(__linux__) + evmask |= POLLRDHUP; +#endif + if (pfd.revents & evmask) + return -1; + + if (!(pfd.revents & POLLIN)) + return -1; + + return 0; +} + +ssize_t Pipe::do_recv(char *buf, size_t len, int flags) +{ +again: + ssize_t got = ::recv( sd, buf, len, flags ); + if (got < 0) { + if (errno == EINTR) { + goto again; + } + ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " + << got << " " << cpp_strerror(errno) << dendl; + return -1; + } + if (got == 0) { + return -1; + } + return got; +} + +ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags) +{ + size_t left = len; + ssize_t total_recv = 0; + if (recv_len > recv_ofs) { + int to_read = std::min(recv_len - recv_ofs, left); + memcpy(buf, &recv_buf[recv_ofs], to_read); + recv_ofs += to_read; + left -= to_read; + if (left == 0) { + return to_read; + } + buf += to_read; + total_recv += to_read; + } + + /* nothing left in the prefetch buffer */ + + if (left > recv_max_prefetch) { + /* this was a large read, we don't prefetch for these */ + ssize_t ret = do_recv(buf, left, flags ); + if (ret < 0) { + if (total_recv > 0) + return total_recv; + return ret; + } + total_recv += ret; + return total_recv; + } + + + ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags); + if (got < 0) { + if (total_recv > 0) + return total_recv; + + return got; + } + + recv_len = (size_t)got; + got = std::min(left, (size_t)got); + memcpy(buf, recv_buf, got); + recv_ofs = got; + total_recv += got; + return total_recv; +} + +ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len) +{ + ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT ); + if (got < 0) { + ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " + << got << " " << cpp_strerror(errno) << dendl; + return -1; + } + if (got == 0) { + /* poll() said there was data, but we didn't read any - peer + * sent a FIN. Maybe POLLRDHUP signals this, but this is + * standard socket behavior as documented by Stevens. + */ + return -1; + } + return got; +} + +int Pipe::tcp_write(const char *buf, unsigned len) +{ + if (sd < 0) + return -1; + struct pollfd pfd; + pfd.fd = sd; + pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR; +#if defined(__linux__) + pfd.events |= POLLRDHUP; +#endif + + if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(msgr->cct, 0) << "injecting socket failure" << dendl; + ::shutdown(sd, SHUT_RDWR); + } + } + + if (poll(&pfd, 1, -1) < 0) + return -1; + + if (!(pfd.revents & POLLOUT)) + return -1; + + //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; + ceph_assert(len > 0); + while (len > 0) { + MSGR_SIGPIPE_STOPPER; + int did = ::send( sd, buf, len, MSG_NOSIGNAL ); + if (did < 0) { + //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; + //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; + return did; + } + len -= did; + buf += did; + //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; + } + return 0; +} diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h new file mode 100644 index 00000000..81245198 --- /dev/null +++ b/src/msg/simple/Pipe.h @@ -0,0 +1,315 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSGR_PIPE_H +#define CEPH_MSGR_PIPE_H + +#include "auth/AuthSessionHandler.h" + +#include "msg/msg_types.h" +#include "msg/Messenger.h" +#include "PipeConnection.h" + + +class SimpleMessenger; +class DispatchQueue; + +static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); + + /** + * The Pipe is the most complex SimpleMessenger component. It gets + * two threads, one each for reading and writing on a socket it's handed + * at creation time, and is responsible for everything that happens on + * that socket. Besides message transmission, it's responsible for + * propagating socket errors to the SimpleMessenger and then sticking + * around in a state where it can provide enough data for the SimpleMessenger + * to provide reliable Message delivery when it manages to reconnect. + */ + class Pipe : public RefCountedObject { + /** + * The Reader thread handles all reads off the socket -- not just + * Messages, but also acks and other protocol bits (excepting startup, + * when the Writer does a couple of reads). + * All the work is implemented in Pipe itself, of course. + */ + class Reader : public Thread { + Pipe *pipe; + public: + explicit Reader(Pipe *p) : pipe(p) {} + void *entry() override { pipe->reader(); return 0; } + } reader_thread; + + /** + * The Writer thread handles all writes to the socket (after startup). + * All the work is implemented in Pipe itself, of course. + */ + class Writer : public Thread { + Pipe *pipe; + public: + explicit Writer(Pipe *p) : pipe(p) {} + void *entry() override { pipe->writer(); return 0; } + } writer_thread; + + class DelayedDelivery; + DelayedDelivery *delay_thread; + public: + Pipe(SimpleMessenger *r, int st, PipeConnection *con); + ~Pipe() override; + + SimpleMessenger *msgr; + uint64_t conn_id; + ostream& _pipe_prefix(std::ostream &out) const; + + Pipe* get() { + return static_cast<Pipe*>(RefCountedObject::get()); + } + + bool is_connected() { + Mutex::Locker l(pipe_lock); + return state == STATE_OPEN; + } + + char *recv_buf; + size_t recv_max_prefetch; + size_t recv_ofs; + size_t recv_len; + + enum { + STATE_ACCEPTING, + STATE_CONNECTING, + STATE_OPEN, + STATE_STANDBY, + STATE_CLOSED, + STATE_CLOSING, + STATE_WAIT // just wait for racing connection + }; + + static const char *get_state_name(int s) { + switch (s) { + case STATE_ACCEPTING: return "accepting"; + case STATE_CONNECTING: return "connecting"; + case STATE_OPEN: return "open"; + case STATE_STANDBY: return "standby"; + case STATE_CLOSED: return "closed"; + case STATE_CLOSING: return "closing"; + case STATE_WAIT: return "wait"; + default: return "UNKNOWN"; + } + } + const char *get_state_name() { + return get_state_name(state); + } + + private: + int sd; + struct iovec msgvec[SM_IOV_MAX]; + + public: + int port; + int peer_type; + entity_addr_t peer_addr; + Messenger::Policy policy; + + Mutex pipe_lock; + int state; + std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED + + // session_security handles any signatures or encryptions required for this pipe's msgs. PLR + + std::shared_ptr<AuthSessionHandler> session_security; + + protected: + friend class SimpleMessenger; + PipeConnectionRef connection_state; + + utime_t backoff; // backoff time + + bool reader_running, reader_needs_join; + bool reader_dispatching; /// reader thread is dispatching without pipe_lock + bool notify_on_dispatch_done; /// something wants a signal when dispatch done + bool writer_running; + + map<int, list<Message*> > out_q; // priority queue for outbound msgs + DispatchQueue *in_q; + list<Message*> sent; + Cond cond; + bool send_keepalive; + bool send_keepalive_ack; + utime_t keepalive_ack_stamp; + bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it + + __u32 connect_seq, peer_global_seq; + uint64_t out_seq; + uint64_t in_seq, in_seq_acked; + + void set_socket_options(); + + int accept(); // server handshake + int connect(); // client handshake + void reader(); + void writer(); + void unlock_maybe_reap(); + + void randomize_out_seq(); + + int read_message(Message **pm, + AuthSessionHandler *session_security_copy); + int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); + /** + * Write the given data (of length len) to the Pipe's socket. This function + * will loop until all passed data has been written out. + * If more is set, the function will optimize socket writes + * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). + * + * @param msg The msghdr to write out + * @param len The length of the data in msg + * @param more Should be set true if this is one part of a larger message + * @return 0, or -1 on failure (unrecoverable -- close the socket). + */ + int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false); + int write_ack(uint64_t s); + int write_keepalive(); + int write_keepalive2(char tag, const utime_t &t); + + void fault(bool reader=false); + + void was_session_reset(); + + /* Clean up sent list */ + void handle_ack(uint64_t seq); + + public: + Pipe(const Pipe& other); + const Pipe& operator=(const Pipe& other); + + void start_reader(); + void start_writer(); + void maybe_start_delay_thread(); + void join_reader(); + + // public constructors + static const Pipe& Server(int s); + static const Pipe& Client(const entity_addr_t& pi); + + uint64_t get_out_seq() { return out_seq; } + + bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } + + entity_addr_t& get_peer_addr() { return peer_addr; } + + void set_peer_addr(const entity_addr_t& a) { + if (&peer_addr != &a) // shut up valgrind + peer_addr = a; + connection_state->set_peer_addr(a); + } + void set_peer_type(int t) { + peer_type = t; + connection_state->set_peer_type(t); + } + + void register_pipe(); + void unregister_pipe(); + void join(); + /// stop a Pipe by closing its socket and setting it to STATE_CLOSED + void stop(); + /// stop() a Pipe if not already done, and wait for it to finish any + /// fast_dispatch in progress. + void stop_and_wait(); + + void _send(Message *m) { + ceph_assert(pipe_lock.is_locked()); + out_q[m->get_priority()].push_back(m); + cond.Signal(); + } + void _send_keepalive() { + ceph_assert(pipe_lock.is_locked()); + send_keepalive = true; + cond.Signal(); + } + Message *_get_next_outgoing() { + ceph_assert(pipe_lock.is_locked()); + Message *m = 0; + while (!m && !out_q.empty()) { + map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); + if (!p->second.empty()) { + m = p->second.front(); + p->second.pop_front(); + } + if (p->second.empty()) + out_q.erase(p->first); + } + return m; + } + + /// move all messages in the sent list back into the queue at the highest priority. + void requeue_sent(); + /// discard messages requeued by requeued_sent() up to a given seq + void discard_requeued_up_to(uint64_t seq); + void discard_out_queue(); + + void shutdown_socket() { + recv_reset(); + if (sd >= 0) + ::shutdown(sd, SHUT_RDWR); + } + + void recv_reset() { + recv_len = 0; + recv_ofs = 0; + } + ssize_t do_recv(char *buf, size_t len, int flags); + ssize_t buffered_recv(char *buf, size_t len, int flags); + bool has_pending_data() { return recv_len > recv_ofs; } + + /** + * do a blocking read of len bytes from socket + * + * @param buf buffer to read into + * @param len exact number of bytes to read + * @return 0 for success, or -1 on error + */ + int tcp_read(char *buf, unsigned len); + + /** + * wait for bytes to become available on the socket + * + * @return 0 for success, or -1 on error + */ + int tcp_read_wait(); + + /** + * non-blocking read of available bytes on socket + * + * This is expected to be used after tcp_read_wait(), and will return + * an error if there is no data on the socket to consume. + * + * @param buf buffer to read into + * @param len maximum number of bytes to read + * @return bytes read, or -1 on error or when there is no data + */ + ssize_t tcp_read_nonblocking(char *buf, unsigned len); + + /** + * blocking write of bytes to socket + * + * @param buf buffer + * @param len number of bytes to write + * @return 0 for success, or -1 on error + */ + int tcp_write(const char *buf, unsigned len); + + }; + + +#endif diff --git a/src/msg/simple/PipeConnection.cc b/src/msg/simple/PipeConnection.cc new file mode 100644 index 00000000..faa1ea9e --- /dev/null +++ b/src/msg/simple/PipeConnection.cc @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "msg/Message.h" +#include "Pipe.h" +#include "SimpleMessenger.h" +#include "PipeConnection.h" + +PipeConnection::~PipeConnection() +{ + if (pipe) { + pipe->put(); + pipe = NULL; + } +} + +Pipe* PipeConnection::get_pipe() +{ + Mutex::Locker l(lock); + if (pipe) + return pipe->get(); + return NULL; +} + +bool PipeConnection::try_get_pipe(Pipe **p) +{ + Mutex::Locker l(lock); + if (failed) { + *p = NULL; + } else { + if (pipe) + *p = pipe->get(); + else + *p = NULL; + } + return !failed; +} + +bool PipeConnection::clear_pipe(Pipe *old_p) +{ + Mutex::Locker l(lock); + if (old_p == pipe) { + pipe->put(); + pipe = NULL; + failed = true; + return true; + } + return false; +} + +void PipeConnection::reset_pipe(Pipe *p) +{ + Mutex::Locker l(lock); + if (pipe) + pipe->put(); + pipe = p->get(); +} + +bool PipeConnection::is_connected() +{ + return static_cast<SimpleMessenger*>(msgr)->is_connected(this); +} + +int PipeConnection::send_message(Message *m) +{ + ceph_assert(msgr); + return static_cast<SimpleMessenger*>(msgr)->send_message(m, this); +} + +void PipeConnection::send_keepalive() +{ + static_cast<SimpleMessenger*>(msgr)->send_keepalive(this); +} + +void PipeConnection::mark_down() +{ + if (msgr) + static_cast<SimpleMessenger*>(msgr)->mark_down(this); +} + +void PipeConnection::mark_disposable() +{ + if (msgr) + static_cast<SimpleMessenger*>(msgr)->mark_disposable(this); +} diff --git a/src/msg/simple/PipeConnection.h b/src/msg/simple/PipeConnection.h new file mode 100644 index 00000000..e5460440 --- /dev/null +++ b/src/msg/simple/PipeConnection.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_PIPECONNECTION_H +#define CEPH_MSG_PIPECONNECTION_H + +#include "msg/Connection.h" + +class Pipe; + +class PipeConnection : public Connection { + Pipe* pipe; + + friend class boost::intrusive_ptr<PipeConnection>; + friend class Pipe; + +public: + + PipeConnection(CephContext *cct, Messenger *m) + : Connection(cct, m), + pipe(NULL) { } + + ~PipeConnection() override; + + Pipe* get_pipe(); + + bool try_get_pipe(Pipe** p); + + bool clear_pipe(Pipe* old_p); + + void reset_pipe(Pipe* p); + + bool is_connected() override; + + int send_message(Message *m) override; + void send_keepalive() override; + void mark_down() override; + void mark_disposable() override; + + entity_addr_t get_peer_socket_addr() const override { + return peer_addrs->front(); + } + +}; /* PipeConnection */ + +typedef boost::intrusive_ptr<PipeConnection> PipeConnectionRef; + +#endif diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc new file mode 100644 index 00000000..09d1ab7b --- /dev/null +++ b/src/msg/simple/SimpleMessenger.cc @@ -0,0 +1,769 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <errno.h> +#include <iostream> +#include <fstream> + + +#include "SimpleMessenger.h" + +#include "common/config.h" +#include "common/Timer.h" +#include "common/errno.h" +#include "common/valgrind.h" +#include "auth/Crypto.h" +#include "include/spinlock.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { + return *_dout << "-- " << msgr->get_myaddr_legacy() << " "; +} + + +/******************* + * SimpleMessenger + */ + +SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce) + : SimplePolicyMessenger(cct, name,mname, _nonce), + accepter(this, _nonce), + dispatch_queue(cct, this, mname), + reaper_thread(this), + nonce(_nonce), + lock("SimpleMessenger::lock"), need_addr(true), did_bind(false), + global_seq(0), + cluster_protocol(0), + reaper_started(false), reaper_stop(false), + timeout(0), + local_connection(new PipeConnection(cct, this)) +{ + ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout), + "SimpleMessenger read timeout"); + init_local_connection(); +} + +/** + * Destroy the SimpleMessenger. Pretty simple since all the work is done + * elsewhere. + */ +SimpleMessenger::~SimpleMessenger() +{ + ceph_assert(!did_bind); // either we didn't bind or we shut down the Accepter + ceph_assert(rank_pipe.empty()); // we don't have any running Pipes. + ceph_assert(!reaper_started); // the reaper thread is stopped +} + +void SimpleMessenger::ready() +{ + ldout(cct,10) << "ready " << get_myaddr_legacy() << dendl; + dispatch_queue.start(); + + lock.Lock(); + if (did_bind) + accepter.start(); + lock.Unlock(); +} + + +int SimpleMessenger::shutdown() +{ + ldout(cct,10) << "shutdown " << get_myaddr_legacy() << dendl; + mark_down_all(); + + // break ref cycles on the loopback connection + local_connection->set_priv(NULL); + + lock.Lock(); + stop_cond.Signal(); + stopped = true; + lock.Unlock(); + + return 0; +} + +int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest) +{ + // set envelope + m->get_header().src = get_myname(); + m->set_cct(cct); + + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + ldout(cct,1) <<"--> " << dest.name << " " + << dest.addr << " -- " << *m + << " -- ?+" << m->get_data().length() + << " " << m + << dendl; + + if (dest.addr == entity_addr_t()) { + ldout(cct,0) << "send_message message " << *m + << " with empty dest " << dest.addr << dendl; + m->put(); + return -EINVAL; + } + + lock.Lock(); + Pipe *pipe = _lookup_pipe(dest.addr); + submit_message(m, (pipe ? pipe->connection_state.get() : NULL), + dest.addr, dest.name.type(), true); + lock.Unlock(); + return 0; +} + +int SimpleMessenger::_send_message(Message *m, Connection *con) +{ + //set envelope + m->get_header().src = get_myname(); + + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + ldout(cct,1) << "--> " << con->get_peer_addr() + << " -- " << *m + << " -- ?+" << m->get_data().length() + << " " << m << " con " << con + << dendl; + + submit_message(m, static_cast<PipeConnection*>(con), + con->get_peer_addr(), con->get_peer_type(), false); + return 0; +} + +/** + * If my_inst.addr doesn't have an IP set, this function + * will fill it in from the passed addr. Otherwise it does nothing and returns. + */ +bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) +{ + bool ret = false; + auto addr = addrs.front(); + ceph_assert(my_addr == my_addrs->front()); + if (my_addr.is_blank_ip()) { + ldout(cct,1) << __func__ << " " << addr << dendl; + entity_addr_t t = my_addr; + int port = t.get_port(); + t.u = addr.u; + t.set_port(port); + set_addrs(entity_addrvec_t(t)); + init_local_connection(); + ret = true; + } else { + ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl; + } + ceph_assert(my_addr == my_addrs->front()); + return ret; +} + +void SimpleMessenger::set_myaddrs(const entity_addrvec_t &av) +{ + my_addr = av.front(); + Messenger::set_myaddrs(av); +} + +void SimpleMessenger::set_addrs(const entity_addrvec_t &av) +{ + auto t = av; + for (auto& a : t.v) { + a.set_nonce(nonce); + } + set_myaddrs(t); + init_local_connection(); +} + +int SimpleMessenger::get_proto_version(int peer_type, bool connect) +{ + int my_type = my_name.type(); + + // set reply protocol version + if (peer_type == my_type) { + // internal + return cluster_protocol; + } else { + // public + if (connect) { + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } else { + switch (my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } + } + return 0; +} + + + + + + + +/******************************************** + * SimpleMessenger + */ +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +void SimpleMessenger::reaper_entry() +{ + ldout(cct,10) << "reaper_entry start" << dendl; + lock.Lock(); + while (!reaper_stop) { + reaper(); // may drop and retake the lock + if (reaper_stop) + break; + reaper_cond.Wait(lock); + } + lock.Unlock(); + ldout(cct,10) << "reaper_entry done" << dendl; +} + +/* + * note: assumes lock is held + */ +void SimpleMessenger::reaper() +{ + ldout(cct,10) << "reaper" << dendl; + ceph_assert(lock.is_locked()); + + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + pipe_reap_queue.pop_front(); + ldout(cct,10) << "reaper reaping pipe " << p << " " << + p->get_peer_addr() << dendl; + p->pipe_lock.Lock(); + p->discard_out_queue(); + if (p->connection_state) { + // mark_down, mark_down_all, or fault() should have done this, + // or accept() may have switch the Connection to a different + // Pipe... but make sure! + bool cleared = p->connection_state->clear_pipe(p); + ceph_assert(!cleared); + } + p->pipe_lock.Unlock(); + p->unregister_pipe(); + ceph_assert(pipes.count(p)); + pipes.erase(p); + + // drop msgr lock while joining thread; the delay through could be + // trying to fast dispatch, preventing it from joining without + // blocking and deadlocking. + lock.Unlock(); + p->join(); + lock.Lock(); + + if (p->sd >= 0) + ::close(p->sd); + ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + p->put(); + ldout(cct,10) << "reaper deleted pipe " << p << dendl; + } + ldout(cct,10) << "reaper done" << dendl; +} + +void SimpleMessenger::queue_reap(Pipe *pipe) +{ + ldout(cct,10) << "queue_reap " << pipe << dendl; + lock.Lock(); + pipe_reap_queue.push_back(pipe); + reaper_cond.Signal(); + lock.Unlock(); +} + +bool SimpleMessenger::is_connected(Connection *con) +{ + bool r = false; + if (con) { + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ceph_assert(p->msgr == this); + r = p->is_connected(); + p->put(); + } + } + return r; +} + +int SimpleMessenger::bind(const entity_addr_t &bind_addr) +{ + lock.Lock(); + if (started) { + ldout(cct,10) << "rank.bind already started" << dendl; + lock.Unlock(); + return -1; + } + ldout(cct,10) << "rank.bind " << bind_addr << dendl; + lock.Unlock(); + + // bind to a socket + set<int> avoid_ports; + int r = accepter.bind(bind_addr, avoid_ports); + if (r >= 0) + did_bind = true; + return r; +} + +int SimpleMessenger::rebind(const set<int>& avoid_ports) +{ + ldout(cct,1) << "rebind avoid " << avoid_ports << dendl; + ceph_assert(did_bind); + accepter.stop(); + mark_down_all(); + return accepter.rebind(avoid_ports); +} + + +int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) +{ + if (!cct->_conf->ms_bind_before_connect) + return 0; + Mutex::Locker l(lock); + if (did_bind) { + ceph_assert(*my_addrs == entity_addrvec_t(bind_addr)); + return 0; + } + if (started) { + ldout(cct,10) << "rank.bind already started" << dendl; + return -1; + } + ldout(cct,10) << "rank.bind " << bind_addr << dendl; + + set_myaddrs(entity_addrvec_t(bind_addr)); + return 0; +} + + +int SimpleMessenger::start() +{ + lock.Lock(); + ldout(cct,1) << "messenger.start" << dendl; + + // register at least one entity, first! + ceph_assert(my_name.type() >= 0); + + ceph_assert(!started); + started = true; + stopped = false; + + if (!did_bind) { + my_addr.nonce = nonce; + init_local_connection(); + } + + lock.Unlock(); + + reaper_started = true; + reaper_thread.create("ms_reaper"); + return 0; +} + +Pipe *SimpleMessenger::add_accept_pipe(int sd) +{ + lock.Lock(); + Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL); + p->sd = sd; + p->pipe_lock.Lock(); + p->start_reader(); + p->pipe_lock.Unlock(); + pipes.insert(p); + accepting_pipes.insert(p); + lock.Unlock(); + return p; +} + +/* connect_rank + * NOTE: assumes messenger.lock held. + */ +Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, + int type, + PipeConnection *con, + Message *first) +{ + ceph_assert(lock.is_locked()); + ceph_assert(addr != my_addr); + + ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + + // create pipe + Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING, + static_cast<PipeConnection*>(con)); + pipe->pipe_lock.Lock(); + pipe->set_peer_type(type); + pipe->set_peer_addr(addr); + pipe->policy = get_policy(type); + pipe->start_writer(); + if (first) + pipe->_send(first); + pipe->pipe_lock.Unlock(); + pipe->register_pipe(); + pipes.insert(pipe); + + return pipe; +} + + + + + + +ConnectionRef SimpleMessenger::connect_to(int type, + const entity_addrvec_t& addrs) +{ + Mutex::Locker l(lock); + if (my_addr == addrs.front()) { + // local + return local_connection; + } + + // remote + while (true) { + Pipe *pipe = _lookup_pipe(addrs.legacy_addr()); + if (pipe) { + ldout(cct, 10) << "get_connection " << addrs << " existing " << pipe << dendl; + } else { + pipe = connect_rank(addrs.legacy_addr(), type, NULL, NULL); + ldout(cct, 10) << "get_connection " << addrs << " new " << pipe << dendl; + } + Mutex::Locker l(pipe->pipe_lock); + if (pipe->connection_state) + return pipe->connection_state; + // we failed too quickly! retry. FIXME. + } +} + +ConnectionRef SimpleMessenger::get_loopback_connection() +{ + return local_connection; +} + +void SimpleMessenger::submit_message(Message *m, PipeConnection *con, + const entity_addr_t& dest_addr, int dest_type, + bool already_locked) +{ + m->trace.event("simple submitting message"); + if (cct->_conf->ms_dump_on_send) { + m->encode(-1, true); + ldout(cct, 0) << "submit_message " << *m << "\n"; + m->get_payload().hexdump(*_dout); + if (m->get_data().length() > 0) { + *_dout << " data:\n"; + m->get_data().hexdump(*_dout); + } + *_dout << dendl; + m->clear_payload(); + } + + // existing connection? + if (con) { + Pipe *pipe = NULL; + bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe); + if (!ok) { + ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr + << ", failed lossy con, dropping message " << m << dendl; + m->put(); + return; + } + while (pipe && ok) { + // we loop in case of a racing reconnect, either from us or them + pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref + if (pipe->state != Pipe::STATE_CLOSED) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; + pipe->_send(m); + pipe->pipe_lock.Unlock(); + pipe->put(); + return; + } + Pipe *current_pipe; + ok = con->try_get_pipe(¤t_pipe); + pipe->pipe_lock.Unlock(); + if (current_pipe == pipe) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr + << ", had pipe " << pipe << ", but it closed." << dendl; + pipe->put(); + current_pipe->put(); + m->put(); + return; + } else { + pipe->put(); + pipe = current_pipe; + } + } + } + + // local? + if (my_addr == dest_addr) { + // local + ldout(cct,20) << "submit_message " << *m << " local" << dendl; + m->set_connection(local_connection.get()); + dispatch_queue.local_delivery(m, m->get_priority()); + return; + } + + // remote, no existing pipe. + const Policy& policy = get_policy(dest_type); + if (policy.server) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type " + << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl; + m->put(); + } else { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; + if (!already_locked) { + /** We couldn't handle the Message without reference to global data, so + * grab the lock and do it again. If we got here, we know it's a non-lossy + * Connection, so we can use our existing pointer without doing another lookup. */ + Mutex::Locker l(lock); + submit_message(m, con, dest_addr, dest_type, true); + } else { + connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m); + } + } +} + +int SimpleMessenger::send_keepalive(Connection *con) +{ + int ret = 0; + Pipe *pipe = static_cast<Pipe *>( + static_cast<PipeConnection*>(con)->get_pipe()); + if (pipe) { + ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl; + ceph_assert(pipe->msgr == this); + pipe->pipe_lock.Lock(); + pipe->_send_keepalive(); + pipe->pipe_lock.Unlock(); + pipe->put(); + } else { + ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl; + ret = -EPIPE; + } + return ret; +} + + + +void SimpleMessenger::wait() +{ + lock.Lock(); + if (!started) { + lock.Unlock(); + return; + } + if (!stopped) + stop_cond.Wait(lock); + + lock.Unlock(); + + // done! clean up. + if (did_bind) { + ldout(cct,20) << "wait: stopping accepter thread" << dendl; + accepter.stop(); + did_bind = false; + ldout(cct,20) << "wait: stopped accepter thread" << dendl; + } + + dispatch_queue.shutdown(); + if (dispatch_queue.is_started()) { + ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + dispatch_queue.discard_local(); + ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; + } + + if (reaper_started) { + ldout(cct,20) << "wait: stopping reaper thread" << dendl; + lock.Lock(); + reaper_cond.Signal(); + reaper_stop = true; + lock.Unlock(); + reaper_thread.join(); + reaper_started = false; + ldout(cct,20) << "wait: stopped reaper thread" << dendl; + } + + // close+reap all pipes + lock.Lock(); + { + ldout(cct,10) << "wait: closing pipes" << dendl; + + while (!rank_pipe.empty()) { + Pipe *p = rank_pipe.begin()->second; + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop_and_wait(); + // don't generate an event here; we're shutting down anyway. + PipeConnectionRef con = p->connection_state; + if (con) + con->clear_pipe(p); + p->pipe_lock.Unlock(); + } + + reaper(); + ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + while (!pipes.empty()) { + reaper_cond.Wait(lock); + reaper(); + } + } + lock.Unlock(); + + ldout(cct,10) << "wait: done." << dendl; + ldout(cct,1) << "shutdown complete." << dendl; + started = false; +} + + +void SimpleMessenger::mark_down_all() +{ + ldout(cct,1) << "mark_down_all" << dendl; + lock.Lock(); + for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) { + Pipe *p = *q; + ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; + p->pipe_lock.Lock(); + p->stop(); + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + accepting_pipes.clear(); + + while (!rank_pipe.empty()) { + ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin(); + Pipe *p = it->second; + ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl; + rank_pipe.erase(it); + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + lock.Unlock(); +} + +void SimpleMessenger::mark_down(const entity_addr_t& addr) +{ + lock.Lock(); + Pipe *p = _lookup_pipe(addr); + if (p) { + ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl; + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + if (p->connection_state) { + // generate a reset event for the caller in this case, even + // though they asked for it, since this is the addr-based (and + // not Connection* based) interface + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + } + p->pipe_lock.Unlock(); + } else { + ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::mark_down(Connection *con) +{ + if (con == NULL) + return; + lock.Lock(); + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ldout(cct,1) << "mark_down " << con << " -- " << p << dendl; + ceph_assert(p->msgr == this); + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } + p->pipe_lock.Unlock(); + p->put(); + } else { + ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::mark_disposable(Connection *con) +{ + lock.Lock(); + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl; + ceph_assert(p->msgr == this); + p->pipe_lock.Lock(); + p->policy.lossy = true; + p->pipe_lock.Unlock(); + p->put(); + } else { + ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + lock.Lock(); + if (need_addr && my_addr.is_blank_ip()) { + entity_addr_t t = peer_addr_for_me; + if (!did_bind) { + t.set_type(entity_addr_t::TYPE_ANY); + t.set_port(0); + } else { + t.set_type(entity_addr_t::TYPE_LEGACY); + t.set_port(my_addr.get_port()); + } + t.set_nonce(my_addr.get_nonce()); + ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr), + "SimpleMessenger learned addr"); + set_myaddrs(entity_addrvec_t(t)); + ldout(cct,1) << "learned my addr " << my_addr << dendl; + need_addr = false; + init_local_connection(); + } + lock.Unlock(); +} + +void SimpleMessenger::init_local_connection() +{ + local_connection->peer_addrs = *my_addrs; + local_connection->peer_type = my_name.type(); + local_connection->set_features(CEPH_FEATURES_ALL); + ms_deliver_handle_fast_connect(local_connection.get()); +} diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h new file mode 100644 index 00000000..b1aad539 --- /dev/null +++ b/src/msg/simple/SimpleMessenger.h @@ -0,0 +1,414 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SIMPLEMESSENGER_H +#define CEPH_SIMPLEMESSENGER_H + +#include <list> +#include <map> + +#include "include/types.h" +#include "include/xlist.h" + +#include "include/unordered_map.h" +#include "include/unordered_set.h" + +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "common/Throttle.h" + +#include "include/spinlock.h" + +#include "msg/SimplePolicyMessenger.h" +#include "msg/Message.h" +#include "include/ceph_assert.h" + +#include "msg/DispatchQueue.h" +#include "Pipe.h" +#include "Accepter.h" + +/* + * This class handles transmission and reception of messages. Generally + * speaking, there are several major components: + * + * - Connection + * Each logical session is associated with a Connection. + * - Pipe + * Each network connection is handled through a pipe, which handles + * the input and output of each message. There is normally a 1:1 + * relationship between Pipe and Connection, but logical sessions may + * get handed off between Pipes when sockets reconnect or during + * connection races. + * - IncomingQueue + * Incoming messages are associated with an IncomingQueue, and there + * is one such queue associated with each Pipe. + * - DispatchQueue + * IncomingQueues get queued in the DIspatchQueue, which is responsible + * for doing a round-robin sweep and processing them via a worker thread. + * - SimpleMessenger + * It's the exterior class passed to the external message handler and + * most of the API details. + * + * Lock ordering: + * + * SimpleMessenger::lock + * Pipe::pipe_lock + * DispatchQueue::lock + * IncomingQueue::lock + */ + +class SimpleMessenger : public SimplePolicyMessenger { + // First we have the public Messenger interface implementation... +public: + /** + * Initialize the SimpleMessenger! + * + * @param cct The CephContext to use + * @param name The name to assign ourselves + * _nonce A unique ID to use for this SimpleMessenger. It should not + * be a value that will be repeated if the daemon restarts. + * features The local features bits for the local_connection + */ + SimpleMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce); + + /** + * Destroy the SimpleMessenger. Pretty simple since all the work is done + * elsewhere. + */ + ~SimpleMessenger() override; + + /** @defgroup Accessors + * @{ + */ + bool set_addr_unknowns(const entity_addrvec_t& addr) override; + void set_addrs(const entity_addrvec_t &addr) override; + void set_myaddrs(const entity_addrvec_t& a) override; + + int get_dispatch_queue_len() override { + return dispatch_queue.get_queue_len(); + } + + double get_dispatch_queue_max_age(utime_t now) override { + return dispatch_queue.get_max_age(now); + } + /** @} Accessors */ + + /** + * @defgroup Configuration functions + * @{ + */ + void set_cluster_protocol(int p) override { + ceph_assert(!started && !did_bind); + cluster_protocol = p; + } + + int bind(const entity_addr_t& bind_addr) override; + int rebind(const set<int>& avoid_ports) override; + int client_bind(const entity_addr_t& bind_addr) override; + + /** @} Configuration functions */ + + /** + * @defgroup Startup/Shutdown + * @{ + */ + int start() override; + void wait() override; + int shutdown() override; + + /** @} // Startup/Shutdown */ + + /** + * @defgroup Messaging + * @{ + */ + int send_to( + Message *m, + int type, + const entity_addrvec_t& addr) override { + // temporary + return _send_message(m, entity_inst_t(entity_name_t(type, -1), + addr.legacy_addr())); + } + + int send_message(Message *m, Connection *con) { + return _send_message(m, con); + } + + /** @} // Messaging */ + + /** + * @defgroup Connection Management + * @{ + */ + ConnectionRef connect_to(int type, const entity_addrvec_t& addrs) override; + ConnectionRef get_loopback_connection() override; + int send_keepalive(Connection *con); + void mark_down(const entity_addr_t& addr) override; + void mark_down(Connection *con); + void mark_disposable(Connection *con); + void mark_down_all() override; + /** @} // Connection Management */ +protected: + /** + * @defgroup Messenger Interfaces + * @{ + */ + /** + * Start up the DispatchQueue thread once we have somebody to dispatch to. + */ + void ready() override; + /** @} // Messenger Interfaces */ +private: + /** + * @defgroup Inner classes + * @{ + */ + +public: + Accepter accepter; + DispatchQueue dispatch_queue; + + friend class Accepter; + + /** + * Register a new pipe for accept + * + * @param sd socket + */ + Pipe *add_accept_pipe(int sd); + +private: + + /** + * A thread used to tear down Pipes when they're complete. + */ + class ReaperThread : public Thread { + SimpleMessenger *msgr; + public: + explicit ReaperThread(SimpleMessenger *m) : msgr(m) {} + void *entry() override { + msgr->reaper_entry(); + return 0; + } + } reaper_thread; + + /** + * @} // Inner classes + */ + + /** + * @defgroup Utility functions + * @{ + */ + + /** + * Create a Pipe associated with the given entity (of the given type). + * Initiate the connection. (This function returning does not guarantee + * connection success.) + * + * @param addr The address of the entity to connect to. + * @param type The peer type of the entity at the address. + * @param con An existing Connection to associate with the new Pipe. If + * NULL, it creates a new Connection. + * @param first an initial message to queue on the new pipe + * + * @return a pointer to the newly-created Pipe. Caller does not own a + * reference; take one if you need it. + */ + Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, + Message *first); + /** + * Send a message, lazily or not. + * This just glues send_message together and passes + * the input on to submit_message. + */ + int _send_message(Message *m, const entity_inst_t& dest); + /** + * Same as above, but for the Connection-based variants. + */ + int _send_message(Message *m, Connection *con); + /** + * Queue up a Message for delivery to the entity specified + * by addr and dest_type. + * submit_message() is responsible for creating + * new Pipes (and closing old ones) as necessary. + * + * @param m The Message to queue up. This function eats a reference. + * @param con The existing Connection to use, or NULL if you don't know of one. + * @param addr The address to send the Message to. + * @param dest_type The peer type of the address we're sending to + * just drop silently under failure. + * @param already_locked If false, submit_message() will acquire the + * SimpleMessenger lock before accessing shared data structures; otherwise + * it will assume the lock is held. NOTE: if you are making a request + * without locking, you MUST have filled in the con with a valid pointer. + */ + void submit_message(Message *m, PipeConnection *con, + const entity_addr_t& addr, int dest_type, + bool already_locked); + /** + * Look through the pipes in the pipe_reap_queue and tear them down. + */ + void reaper(); + /** + * @} // Utility functions + */ + + // SimpleMessenger stuff + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + /// overall lock used for SimpleMessenger data structures + Mutex lock; + /// true, specifying we haven't learned our addr; set false when we find it. + // maybe this should be protected by the lock? + bool need_addr; + +public: + bool get_need_addr() const { return need_addr; } + +private: + /** + * false; set to true if the SimpleMessenger bound to a specific address; + * and set false again by Accepter::stop(). This isn't lock-protected + * since you shouldn't be able to race the only writers. + */ + bool did_bind; + /// counter for the global seq our connection protocol uses + __u32 global_seq; + /// lock to protect the global_seq + ceph::spinlock global_seq_lock; + + entity_addr_t my_addr; + + /** + * hash map of addresses to Pipes + * + * NOTE: a Pipe* with state CLOSED may still be in the map but is considered + * invalid and can be replaced by anyone holding the msgr lock + */ + ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe; + /** + * list of pipes are in the process of accepting + * + * These are not yet in the rank_pipe map. + */ + set<Pipe*> accepting_pipes; + /// a set of all the Pipes we have which are somehow active + set<Pipe*> pipes; + /// a list of Pipes we want to tear down + list<Pipe*> pipe_reap_queue; + + /// internal cluster protocol version, if any, for talking to entities of the same type. + int cluster_protocol; + + Cond stop_cond; + bool stopped = true; + + bool reaper_started, reaper_stop; + Cond reaper_cond; + + /// This Cond is slept on by wait() and signaled by dispatch_entry() + Cond wait_cond; + + friend class Pipe; + + Pipe *_lookup_pipe(const entity_addr_t& k) { + ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k); + if (p == rank_pipe.end()) + return NULL; + // see lock cribbing in Pipe::fault() + if (p->second->state_closed) + return NULL; + return p->second; + } + +public: + + int timeout; + + /// con used for sending messages to ourselves + ConnectionRef local_connection; + + /** + * @defgroup SimpleMessenger internals + * @{ + */ + + /** + * Increment the global sequence for this SimpleMessenger and return it. + * This is for the connect protocol, although it doesn't hurt if somebody + * else calls it. + * + * @return a global sequence ID that nobody else has seen. + */ + __u32 get_global_seq(__u32 old=0) { + std::lock_guard<decltype(global_seq_lock)> lg(global_seq_lock); + + if (old > global_seq) + global_seq = old; + __u32 ret = ++global_seq; + + return ret; + } + /** + * Get the protocol version we support for the given peer type: either + * a peer protocol (if it matches our own), the protocol version for the + * peer (if we're connecting), or our protocol version (if we're accepting). + */ + int get_proto_version(int peer_type, bool connect); + + /** + * Fill in the features, address and peer type for the local connection, which + * is used for delivering messages back to ourself. + */ + void init_local_connection(); + /** + * Tell the SimpleMessenger its full IP address. + * + * This is used by Pipes when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + + /** + * This function is used by the reaper thread. As long as nobody + * has set reaper_stop, it calls the reaper function, then + * waits to be signaled when it needs to reap again (or when it needs + * to stop). + */ + void reaper_entry(); + /** + * Add a pipe to the pipe_reap_queue, to be torn down on + * the next call to reaper(). + * It should really only be the Pipe calling this, in our current + * implementation. + * + * @param pipe A Pipe which has stopped its threads and is + * ready to be torn down. + */ + void queue_reap(Pipe *pipe); + + /** + * Used to get whether this connection ready to send + */ + bool is_connected(Connection *con); + /** + * @} // SimpleMessenger Internals + */ +} ; + +#endif /* CEPH_SIMPLEMESSENGER_H */ |