diff options
Diffstat (limited to 'src/msg/simple/SimpleMessenger.cc')
-rw-r--r-- | src/msg/simple/SimpleMessenger.cc | 769 |
1 files changed, 769 insertions, 0 deletions
diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc new file mode 100644 index 00000000..09d1ab7b --- /dev/null +++ b/src/msg/simple/SimpleMessenger.cc @@ -0,0 +1,769 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <errno.h> +#include <iostream> +#include <fstream> + + +#include "SimpleMessenger.h" + +#include "common/config.h" +#include "common/Timer.h" +#include "common/errno.h" +#include "common/valgrind.h" +#include "auth/Crypto.h" +#include "include/spinlock.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { + return *_dout << "-- " << msgr->get_myaddr_legacy() << " "; +} + + +/******************* + * SimpleMessenger + */ + +SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce) + : SimplePolicyMessenger(cct, name,mname, _nonce), + accepter(this, _nonce), + dispatch_queue(cct, this, mname), + reaper_thread(this), + nonce(_nonce), + lock("SimpleMessenger::lock"), need_addr(true), did_bind(false), + global_seq(0), + cluster_protocol(0), + reaper_started(false), reaper_stop(false), + timeout(0), + local_connection(new PipeConnection(cct, this)) +{ + ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout), + "SimpleMessenger read timeout"); + init_local_connection(); +} + +/** + * Destroy the SimpleMessenger. Pretty simple since all the work is done + * elsewhere. + */ +SimpleMessenger::~SimpleMessenger() +{ + ceph_assert(!did_bind); // either we didn't bind or we shut down the Accepter + ceph_assert(rank_pipe.empty()); // we don't have any running Pipes. + ceph_assert(!reaper_started); // the reaper thread is stopped +} + +void SimpleMessenger::ready() +{ + ldout(cct,10) << "ready " << get_myaddr_legacy() << dendl; + dispatch_queue.start(); + + lock.Lock(); + if (did_bind) + accepter.start(); + lock.Unlock(); +} + + +int SimpleMessenger::shutdown() +{ + ldout(cct,10) << "shutdown " << get_myaddr_legacy() << dendl; + mark_down_all(); + + // break ref cycles on the loopback connection + local_connection->set_priv(NULL); + + lock.Lock(); + stop_cond.Signal(); + stopped = true; + lock.Unlock(); + + return 0; +} + +int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest) +{ + // set envelope + m->get_header().src = get_myname(); + m->set_cct(cct); + + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + ldout(cct,1) <<"--> " << dest.name << " " + << dest.addr << " -- " << *m + << " -- ?+" << m->get_data().length() + << " " << m + << dendl; + + if (dest.addr == entity_addr_t()) { + ldout(cct,0) << "send_message message " << *m + << " with empty dest " << dest.addr << dendl; + m->put(); + return -EINVAL; + } + + lock.Lock(); + Pipe *pipe = _lookup_pipe(dest.addr); + submit_message(m, (pipe ? pipe->connection_state.get() : NULL), + dest.addr, dest.name.type(), true); + lock.Unlock(); + return 0; +} + +int SimpleMessenger::_send_message(Message *m, Connection *con) +{ + //set envelope + m->get_header().src = get_myname(); + + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + ldout(cct,1) << "--> " << con->get_peer_addr() + << " -- " << *m + << " -- ?+" << m->get_data().length() + << " " << m << " con " << con + << dendl; + + submit_message(m, static_cast<PipeConnection*>(con), + con->get_peer_addr(), con->get_peer_type(), false); + return 0; +} + +/** + * If my_inst.addr doesn't have an IP set, this function + * will fill it in from the passed addr. Otherwise it does nothing and returns. + */ +bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) +{ + bool ret = false; + auto addr = addrs.front(); + ceph_assert(my_addr == my_addrs->front()); + if (my_addr.is_blank_ip()) { + ldout(cct,1) << __func__ << " " << addr << dendl; + entity_addr_t t = my_addr; + int port = t.get_port(); + t.u = addr.u; + t.set_port(port); + set_addrs(entity_addrvec_t(t)); + init_local_connection(); + ret = true; + } else { + ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl; + } + ceph_assert(my_addr == my_addrs->front()); + return ret; +} + +void SimpleMessenger::set_myaddrs(const entity_addrvec_t &av) +{ + my_addr = av.front(); + Messenger::set_myaddrs(av); +} + +void SimpleMessenger::set_addrs(const entity_addrvec_t &av) +{ + auto t = av; + for (auto& a : t.v) { + a.set_nonce(nonce); + } + set_myaddrs(t); + init_local_connection(); +} + +int SimpleMessenger::get_proto_version(int peer_type, bool connect) +{ + int my_type = my_name.type(); + + // set reply protocol version + if (peer_type == my_type) { + // internal + return cluster_protocol; + } else { + // public + if (connect) { + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } else { + switch (my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } + } + return 0; +} + + + + + + + +/******************************************** + * SimpleMessenger + */ +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +void SimpleMessenger::reaper_entry() +{ + ldout(cct,10) << "reaper_entry start" << dendl; + lock.Lock(); + while (!reaper_stop) { + reaper(); // may drop and retake the lock + if (reaper_stop) + break; + reaper_cond.Wait(lock); + } + lock.Unlock(); + ldout(cct,10) << "reaper_entry done" << dendl; +} + +/* + * note: assumes lock is held + */ +void SimpleMessenger::reaper() +{ + ldout(cct,10) << "reaper" << dendl; + ceph_assert(lock.is_locked()); + + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + pipe_reap_queue.pop_front(); + ldout(cct,10) << "reaper reaping pipe " << p << " " << + p->get_peer_addr() << dendl; + p->pipe_lock.Lock(); + p->discard_out_queue(); + if (p->connection_state) { + // mark_down, mark_down_all, or fault() should have done this, + // or accept() may have switch the Connection to a different + // Pipe... but make sure! + bool cleared = p->connection_state->clear_pipe(p); + ceph_assert(!cleared); + } + p->pipe_lock.Unlock(); + p->unregister_pipe(); + ceph_assert(pipes.count(p)); + pipes.erase(p); + + // drop msgr lock while joining thread; the delay through could be + // trying to fast dispatch, preventing it from joining without + // blocking and deadlocking. + lock.Unlock(); + p->join(); + lock.Lock(); + + if (p->sd >= 0) + ::close(p->sd); + ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + p->put(); + ldout(cct,10) << "reaper deleted pipe " << p << dendl; + } + ldout(cct,10) << "reaper done" << dendl; +} + +void SimpleMessenger::queue_reap(Pipe *pipe) +{ + ldout(cct,10) << "queue_reap " << pipe << dendl; + lock.Lock(); + pipe_reap_queue.push_back(pipe); + reaper_cond.Signal(); + lock.Unlock(); +} + +bool SimpleMessenger::is_connected(Connection *con) +{ + bool r = false; + if (con) { + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ceph_assert(p->msgr == this); + r = p->is_connected(); + p->put(); + } + } + return r; +} + +int SimpleMessenger::bind(const entity_addr_t &bind_addr) +{ + lock.Lock(); + if (started) { + ldout(cct,10) << "rank.bind already started" << dendl; + lock.Unlock(); + return -1; + } + ldout(cct,10) << "rank.bind " << bind_addr << dendl; + lock.Unlock(); + + // bind to a socket + set<int> avoid_ports; + int r = accepter.bind(bind_addr, avoid_ports); + if (r >= 0) + did_bind = true; + return r; +} + +int SimpleMessenger::rebind(const set<int>& avoid_ports) +{ + ldout(cct,1) << "rebind avoid " << avoid_ports << dendl; + ceph_assert(did_bind); + accepter.stop(); + mark_down_all(); + return accepter.rebind(avoid_ports); +} + + +int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) +{ + if (!cct->_conf->ms_bind_before_connect) + return 0; + Mutex::Locker l(lock); + if (did_bind) { + ceph_assert(*my_addrs == entity_addrvec_t(bind_addr)); + return 0; + } + if (started) { + ldout(cct,10) << "rank.bind already started" << dendl; + return -1; + } + ldout(cct,10) << "rank.bind " << bind_addr << dendl; + + set_myaddrs(entity_addrvec_t(bind_addr)); + return 0; +} + + +int SimpleMessenger::start() +{ + lock.Lock(); + ldout(cct,1) << "messenger.start" << dendl; + + // register at least one entity, first! + ceph_assert(my_name.type() >= 0); + + ceph_assert(!started); + started = true; + stopped = false; + + if (!did_bind) { + my_addr.nonce = nonce; + init_local_connection(); + } + + lock.Unlock(); + + reaper_started = true; + reaper_thread.create("ms_reaper"); + return 0; +} + +Pipe *SimpleMessenger::add_accept_pipe(int sd) +{ + lock.Lock(); + Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL); + p->sd = sd; + p->pipe_lock.Lock(); + p->start_reader(); + p->pipe_lock.Unlock(); + pipes.insert(p); + accepting_pipes.insert(p); + lock.Unlock(); + return p; +} + +/* connect_rank + * NOTE: assumes messenger.lock held. + */ +Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, + int type, + PipeConnection *con, + Message *first) +{ + ceph_assert(lock.is_locked()); + ceph_assert(addr != my_addr); + + ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + + // create pipe + Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING, + static_cast<PipeConnection*>(con)); + pipe->pipe_lock.Lock(); + pipe->set_peer_type(type); + pipe->set_peer_addr(addr); + pipe->policy = get_policy(type); + pipe->start_writer(); + if (first) + pipe->_send(first); + pipe->pipe_lock.Unlock(); + pipe->register_pipe(); + pipes.insert(pipe); + + return pipe; +} + + + + + + +ConnectionRef SimpleMessenger::connect_to(int type, + const entity_addrvec_t& addrs) +{ + Mutex::Locker l(lock); + if (my_addr == addrs.front()) { + // local + return local_connection; + } + + // remote + while (true) { + Pipe *pipe = _lookup_pipe(addrs.legacy_addr()); + if (pipe) { + ldout(cct, 10) << "get_connection " << addrs << " existing " << pipe << dendl; + } else { + pipe = connect_rank(addrs.legacy_addr(), type, NULL, NULL); + ldout(cct, 10) << "get_connection " << addrs << " new " << pipe << dendl; + } + Mutex::Locker l(pipe->pipe_lock); + if (pipe->connection_state) + return pipe->connection_state; + // we failed too quickly! retry. FIXME. + } +} + +ConnectionRef SimpleMessenger::get_loopback_connection() +{ + return local_connection; +} + +void SimpleMessenger::submit_message(Message *m, PipeConnection *con, + const entity_addr_t& dest_addr, int dest_type, + bool already_locked) +{ + m->trace.event("simple submitting message"); + if (cct->_conf->ms_dump_on_send) { + m->encode(-1, true); + ldout(cct, 0) << "submit_message " << *m << "\n"; + m->get_payload().hexdump(*_dout); + if (m->get_data().length() > 0) { + *_dout << " data:\n"; + m->get_data().hexdump(*_dout); + } + *_dout << dendl; + m->clear_payload(); + } + + // existing connection? + if (con) { + Pipe *pipe = NULL; + bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe); + if (!ok) { + ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr + << ", failed lossy con, dropping message " << m << dendl; + m->put(); + return; + } + while (pipe && ok) { + // we loop in case of a racing reconnect, either from us or them + pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref + if (pipe->state != Pipe::STATE_CLOSED) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; + pipe->_send(m); + pipe->pipe_lock.Unlock(); + pipe->put(); + return; + } + Pipe *current_pipe; + ok = con->try_get_pipe(¤t_pipe); + pipe->pipe_lock.Unlock(); + if (current_pipe == pipe) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr + << ", had pipe " << pipe << ", but it closed." << dendl; + pipe->put(); + current_pipe->put(); + m->put(); + return; + } else { + pipe->put(); + pipe = current_pipe; + } + } + } + + // local? + if (my_addr == dest_addr) { + // local + ldout(cct,20) << "submit_message " << *m << " local" << dendl; + m->set_connection(local_connection.get()); + dispatch_queue.local_delivery(m, m->get_priority()); + return; + } + + // remote, no existing pipe. + const Policy& policy = get_policy(dest_type); + if (policy.server) { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type " + << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl; + m->put(); + } else { + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; + if (!already_locked) { + /** We couldn't handle the Message without reference to global data, so + * grab the lock and do it again. If we got here, we know it's a non-lossy + * Connection, so we can use our existing pointer without doing another lookup. */ + Mutex::Locker l(lock); + submit_message(m, con, dest_addr, dest_type, true); + } else { + connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m); + } + } +} + +int SimpleMessenger::send_keepalive(Connection *con) +{ + int ret = 0; + Pipe *pipe = static_cast<Pipe *>( + static_cast<PipeConnection*>(con)->get_pipe()); + if (pipe) { + ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl; + ceph_assert(pipe->msgr == this); + pipe->pipe_lock.Lock(); + pipe->_send_keepalive(); + pipe->pipe_lock.Unlock(); + pipe->put(); + } else { + ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl; + ret = -EPIPE; + } + return ret; +} + + + +void SimpleMessenger::wait() +{ + lock.Lock(); + if (!started) { + lock.Unlock(); + return; + } + if (!stopped) + stop_cond.Wait(lock); + + lock.Unlock(); + + // done! clean up. + if (did_bind) { + ldout(cct,20) << "wait: stopping accepter thread" << dendl; + accepter.stop(); + did_bind = false; + ldout(cct,20) << "wait: stopped accepter thread" << dendl; + } + + dispatch_queue.shutdown(); + if (dispatch_queue.is_started()) { + ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + dispatch_queue.discard_local(); + ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; + } + + if (reaper_started) { + ldout(cct,20) << "wait: stopping reaper thread" << dendl; + lock.Lock(); + reaper_cond.Signal(); + reaper_stop = true; + lock.Unlock(); + reaper_thread.join(); + reaper_started = false; + ldout(cct,20) << "wait: stopped reaper thread" << dendl; + } + + // close+reap all pipes + lock.Lock(); + { + ldout(cct,10) << "wait: closing pipes" << dendl; + + while (!rank_pipe.empty()) { + Pipe *p = rank_pipe.begin()->second; + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop_and_wait(); + // don't generate an event here; we're shutting down anyway. + PipeConnectionRef con = p->connection_state; + if (con) + con->clear_pipe(p); + p->pipe_lock.Unlock(); + } + + reaper(); + ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + while (!pipes.empty()) { + reaper_cond.Wait(lock); + reaper(); + } + } + lock.Unlock(); + + ldout(cct,10) << "wait: done." << dendl; + ldout(cct,1) << "shutdown complete." << dendl; + started = false; +} + + +void SimpleMessenger::mark_down_all() +{ + ldout(cct,1) << "mark_down_all" << dendl; + lock.Lock(); + for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) { + Pipe *p = *q; + ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; + p->pipe_lock.Lock(); + p->stop(); + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + accepting_pipes.clear(); + + while (!rank_pipe.empty()) { + ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin(); + Pipe *p = it->second; + ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl; + rank_pipe.erase(it); + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + lock.Unlock(); +} + +void SimpleMessenger::mark_down(const entity_addr_t& addr) +{ + lock.Lock(); + Pipe *p = _lookup_pipe(addr); + if (p) { + ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl; + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + if (p->connection_state) { + // generate a reset event for the caller in this case, even + // though they asked for it, since this is the addr-based (and + // not Connection* based) interface + PipeConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + } + p->pipe_lock.Unlock(); + } else { + ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::mark_down(Connection *con) +{ + if (con == NULL) + return; + lock.Lock(); + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ldout(cct,1) << "mark_down " << con << " -- " << p << dendl; + ceph_assert(p->msgr == this); + p->unregister_pipe(); + p->pipe_lock.Lock(); + p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } + p->pipe_lock.Unlock(); + p->put(); + } else { + ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::mark_disposable(Connection *con) +{ + lock.Lock(); + Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe()); + if (p) { + ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl; + ceph_assert(p->msgr == this); + p->pipe_lock.Lock(); + p->policy.lossy = true; + p->pipe_lock.Unlock(); + p->put(); + } else { + ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + lock.Lock(); + if (need_addr && my_addr.is_blank_ip()) { + entity_addr_t t = peer_addr_for_me; + if (!did_bind) { + t.set_type(entity_addr_t::TYPE_ANY); + t.set_port(0); + } else { + t.set_type(entity_addr_t::TYPE_LEGACY); + t.set_port(my_addr.get_port()); + } + t.set_nonce(my_addr.get_nonce()); + ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr), + "SimpleMessenger learned addr"); + set_myaddrs(entity_addrvec_t(t)); + ldout(cct,1) << "learned my addr " << my_addr << dendl; + need_addr = false; + init_local_connection(); + } + lock.Unlock(); +} + +void SimpleMessenger::init_local_connection() +{ + local_connection->peer_addrs = *my_addrs; + local_connection->peer_type = my_name.type(); + local_connection->set_features(CEPH_FEATURES_ALL); + ms_deliver_handle_fast_connect(local_connection.get()); +} |