summaryrefslogtreecommitdiffstats
path: root/src/msg/xio/XioPortal.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/xio/XioPortal.h')
-rw-r--r--src/msg/xio/XioPortal.h458
1 files changed, 458 insertions, 0 deletions
diff --git a/src/msg/xio/XioPortal.h b/src/msg/xio/XioPortal.h
new file mode 100644
index 00000000..7a0afee4
--- /dev/null
+++ b/src/msg/xio/XioPortal.h
@@ -0,0 +1,458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *s
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_PORTAL_H
+#define XIO_PORTAL_H
+
+#include <string>
+
+extern "C" {
+#include "libxio.h"
+}
+#include "XioInSeq.h"
+#include <boost/lexical_cast.hpp>
+#include "msg/SimplePolicyMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+#include "include/spinlock.h"
+
+#include "include/ceph_assert.h"
+#include "common/dout.h"
+
+#ifndef CACHE_LINE_SIZE
+#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
+#endif
+#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
+
+class XioPortal : public Thread
+{
+private:
+
+ struct SubmitQueue
+ {
+ const static int nlanes = 7;
+
+ struct Lane
+ {
+ uint32_t size;
+ XioSubmit::Queue q;
+ ceph::spinlock sp;
+ CACHE_PAD(0);
+ };
+
+ Lane qlane[nlanes];
+
+ int ix; /* atomicity by portal thread */
+
+ SubmitQueue() : ix(0)
+ {
+ int ix;
+ Lane* lane;
+
+ for (ix = 0; ix < nlanes; ++ix) {
+ lane = &qlane[ix];
+ lane->size = 0;
+ }
+ }
+
+ inline Lane* get_lane(XioConnection *xcon)
+ {
+ return &qlane[(((uint64_t) xcon) / 16) % nlanes];
+ }
+
+ void enq(XioConnection *xcon, XioSubmit* xs)
+ {
+ Lane* lane = get_lane(xcon);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ lane->q.push_back(*xs);
+ ++(lane->size);
+ }
+
+ void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
+ {
+ int size = requeue_q.size();
+ Lane* lane = get_lane(xcon);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ XioSubmit::Queue::const_iterator i1 = lane->q.end();
+ lane->q.splice(i1, requeue_q);
+ lane->size += size;
+ }
+
+ void deq(XioSubmit::Queue& send_q)
+ {
+ Lane* lane;
+ int cnt;
+
+ for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ lane = &qlane[ix];
+ if (lane->size > 0) {
+ XioSubmit::Queue::const_iterator i1 = send_q.end();
+ send_q.splice(i1, lane->q);
+ lane->size = 0;
+ ++ix, ix = ix % nlanes;
+ break;
+ }
+ }
+ }
+
+ }; /* SubmitQueue */
+
+ Messenger *msgr;
+ struct xio_context *ctx;
+ struct xio_server *server;
+ SubmitQueue submit_q;
+ ceph::spinlock sp;
+ void *ev_loop;
+ string xio_uri;
+ char *portal_id;
+ bool _shutdown;
+ bool drained;
+ uint32_t magic;
+ uint32_t special_handling;
+
+ friend class XioPortals;
+ friend class XioMessenger;
+
+public:
+ explicit XioPortal(Messenger *_msgr, int max_conns) :
+ msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
+ portal_id(NULL), _shutdown(false), drained(false),
+ magic(0),
+ special_handling(0)
+ {
+ struct xio_context_params ctx_params;
+ memset(&ctx_params, 0, sizeof(ctx_params));
+ ctx_params.user_context = this;
+ /*
+ * hint to Accelio the total number of connections that will share
+ * this context's resources: internal primary task pool...
+ */
+ ctx_params.max_conns_per_ctx = max_conns;
+
+ /* a portal is an xio_context and event loop */
+ ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
+ ceph_assert(ctx && "Whoops, failed to create portal/ctx");
+ }
+
+ int bind(struct xio_session_ops *ops, const string &base_uri,
+ uint16_t port, uint16_t *assigned_port);
+
+ inline void release_xio_msg(XioCompletion* xcmp) {
+ struct xio_msg *msg = xcmp->dequeue();
+ struct xio_msg *next_msg = NULL;
+ int code;
+ if (unlikely(!xcmp->xcon->conn)) {
+ // NOTE: msg is not safe to dereference if the connection was torn down
+ xcmp->xcon->msg_release_fail(msg, ENOTCONN);
+ }
+ else while (msg) {
+ next_msg = static_cast<struct xio_msg *>(msg->user_context);
+ code = xio_release_msg(msg);
+ if (unlikely(code)) /* very unlikely, so log it */
+ xcmp->xcon->msg_release_fail(msg, code);
+ msg = next_msg;
+ }
+ xcmp->trace.event("xio_release_msg");
+ xcmp->finalize(); /* unconditional finalize */
+ }
+
+ void enqueue(XioConnection *xcon, XioSubmit *xs)
+ {
+ if (! _shutdown) {
+ submit_q.enq(xcon, xs);
+ xio_context_stop_loop(ctx);
+ return;
+ }
+
+ /* dispose xs */
+ switch(xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ {
+ XioSend* xsend = static_cast<XioSend*>(xs);
+ xs->xcon->msg_send_fail(xsend, -EINVAL);
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ release_xio_msg(static_cast<XioCompletion*>(xs));
+ break;
+ };
+ }
+
+ void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
+ submit_q.enq(xcon, send_q);
+ }
+
+ void requeue_all_xcon(XioConnection* xcon,
+ XioSubmit::Queue::iterator& q_iter,
+ XioSubmit::Queue& send_q) {
+ // XXX gather all already-dequeued outgoing messages for xcon
+ // and push them in FIFO order to front of the input queue,
+ // and mark the connection as flow-controlled
+ XioSubmit::Queue requeue_q;
+
+ while (q_iter != send_q.end()) {
+ XioSubmit *xs = &(*q_iter);
+ // skip retires and anything for other connections
+ if (xs->xcon != xcon) {
+ q_iter++;
+ continue;
+ }
+ q_iter = send_q.erase(q_iter);
+ requeue_q.push_back(*xs);
+ }
+ std::lock_guard<decltype(xcon->sp)> lg(xcon->sp);
+ XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
+ xcon->outgoing.requeue.splice(i1, requeue_q);
+ xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
+ }
+
+ void *entry()
+ {
+ int size, code = 0;
+ uint32_t xio_qdepth_high;
+ XioSubmit::Queue send_q;
+ XioSubmit::Queue::iterator q_iter;
+ struct xio_msg *msg = NULL;
+ XioConnection *xcon;
+ XioSubmit *xs;
+ XioSend *xsend;
+
+ do {
+ submit_q.deq(send_q);
+
+ /* shutdown() barrier */
+ std::lock_guard<decltype(sp)> lg(sp);
+
+ restart:
+ size = send_q.size();
+
+ if (_shutdown) {
+ // XXX XioSend queues for flow-controlled connections may require
+ // cleanup
+ drained = true;
+ }
+
+ if (size > 0) {
+ q_iter = send_q.begin();
+ while (q_iter != send_q.end()) {
+ xs = &(*q_iter);
+ xcon = xs->xcon;
+
+ switch (xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ xsend = static_cast<XioSend*>(xs);
+ if (unlikely(!xcon->conn || !xcon->is_connected()))
+ code = ENOTCONN;
+ else {
+ /* XXX guard Accelio send queue (should be safe to rely
+ * on Accelio's check on below, but this assures that
+ * all chained xio_msg are accounted) */
+ xio_qdepth_high = xcon->xio_qdepth_high_mark();
+ if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
+ xio_qdepth_high)) {
+ requeue_all_xcon(xcon, q_iter, send_q);
+ goto restart;
+ }
+
+ xs->trace.event("xio_send_msg");
+ msg = xsend->get_xio_msg();
+ code = xio_send_msg(xcon->conn, msg);
+ /* header trace moved here to capture xio serial# */
+ if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+ xsend->print_debug(msgr->cct, "xio_send_msg");
+ }
+ /* get the right Accelio's errno code */
+ if (unlikely(code)) {
+ if ((code == -1) && (xio_errno() == -1)) {
+ /* In case XIO does not have any credits to send,
+ * it would still queue up the message(s) for transmission,
+ * but would return -1 and errno would also be set to -1.
+ * This needs to be treated as a success.
+ */
+ code = 0;
+ }
+ else {
+ code = xio_errno();
+ }
+ }
+ } /* !ENOTCONN */
+ if (unlikely(code)) {
+ switch (code) {
+ case XIO_E_TX_QUEUE_OVERFLOW:
+ {
+ requeue_all_xcon(xcon, q_iter, send_q);
+ goto restart;
+ }
+ break;
+ default:
+ q_iter = send_q.erase(q_iter);
+ xcon->msg_send_fail(xsend, code);
+ continue;
+ break;
+ };
+ } else {
+ xcon->send.set(msg->timestamp); // need atomic?
+ xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ q_iter = send_q.erase(q_iter);
+ release_xio_msg(static_cast<XioCompletion*>(xs));
+ continue;
+ } /* switch (xs->type) */
+ q_iter = send_q.erase(q_iter);
+ } /* while */
+ } /* size > 0 */
+
+ xio_context_run_loop(ctx, 300);
+
+ } while ((!_shutdown) || (!drained));
+
+ /* shutting down */
+ if (server) {
+ xio_unbind(server);
+ }
+ xio_context_destroy(ctx);
+ return NULL;
+ }
+
+ void shutdown()
+ {
+ std::lock_guard<decltype(sp)> lg(sp);
+ _shutdown = true;
+ }
+};
+
+class XioPortals
+{
+private:
+ vector<XioPortal*> portals;
+ char **p_vec;
+ int n;
+ int last_unused;
+
+public:
+ XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
+ {
+ n = max(_n, 1);
+
+ portals.resize(n);
+ for (int i = 0; i < n; i++) {
+ if (!portals[i]) {
+ portals[i] = new XioPortal(msgr, nconns);
+ ceph_assert(portals[i] != nullptr);
+ }
+ }
+ }
+
+ vector<XioPortal*>& get() { return portals; }
+
+ const char **get_vec()
+ {
+ return (const char **) p_vec;
+ }
+
+ int get_portals_len()
+ {
+ return n;
+ }
+
+ int get_last_unused()
+ {
+ int pix = last_unused;
+ if (++last_unused >= get_portals_len())
+ last_unused = 0;
+ return pix;
+ }
+
+ XioPortal* get_next_portal()
+ {
+ int pix = get_last_unused();
+ return portals[pix];
+ }
+
+ int bind(struct xio_session_ops *ops, const string& base_uri,
+ uint16_t port, uint16_t *port0);
+
+ int accept(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+ {
+ const char **portals_vec = get_vec();
+ int pix = get_last_unused();
+
+ if (pix == 0) {
+ return xio_accept(session, NULL, 0, NULL, 0);
+ } else {
+ return xio_accept(session,
+ (const char **)&(portals_vec[pix]),
+ 1, NULL, 0);
+ }
+ }
+
+ void start()
+ {
+ XioPortal *portal;
+ int p_ix, nportals = portals.size();
+
+ p_vec = new char*[nportals];
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
+ portal->portal_id;
+ }
+
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
+ string thread_name = "ms_xio_";
+ thread_name.append(std::to_string(p_ix));
+ portal = portals[p_ix];
+ portal->create(thread_name.c_str());
+ }
+ }
+
+ void shutdown()
+ {
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ XioPortal *portal = portals[p_ix];
+ portal->shutdown();
+ }
+ }
+
+ void join()
+ {
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ XioPortal *portal = portals[p_ix];
+ portal->join();
+ }
+ }
+
+ ~XioPortals()
+ {
+ int nportals = portals.size();
+ for (int ix = 0; ix < nportals; ++ix)
+ delete(portals[ix]);
+ portals.clear();
+ if (p_vec)
+ delete[] p_vec;
+ }
+};
+
+#endif /* XIO_PORTAL_H */