// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 * Portions Copyright (C) 2013 CohortFS, LLC
 *s
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#ifndef XIO_PORTAL_H
#define XIO_PORTAL_H

#include <string>

extern "C" {
#include "libxio.h"
}
#include "XioInSeq.h"
#include <boost/lexical_cast.hpp>
#include "msg/SimplePolicyMessenger.h"
#include "XioConnection.h"
#include "XioMsg.h"

#include "include/spinlock.h"

#include "include/ceph_assert.h"
#include "common/dout.h"

#ifndef CACHE_LINE_SIZE
#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
#endif
#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]

class XioPortal : public Thread
{
private:

  struct SubmitQueue
  {
    const static int nlanes = 7;

    struct Lane
    {
      uint32_t size;
      XioSubmit::Queue q;
      ceph::spinlock sp;
      CACHE_PAD(0);
    };

    Lane qlane[nlanes];

    int ix; /* atomicity by portal thread */

    SubmitQueue() : ix(0)
      {
	int ix;
	Lane* lane;

	for (ix = 0; ix < nlanes; ++ix) {
	  lane = &qlane[ix];
	  lane->size = 0;
	}
      }

    inline Lane* get_lane(XioConnection *xcon)
      {
	return &qlane[(((uint64_t) xcon) / 16) % nlanes];
      }

    void enq(XioConnection *xcon, XioSubmit* xs)
      {
	Lane* lane = get_lane(xcon);
    std::lock_guard<decltype(lane->sp)> lg(lane->sp);
	lane->q.push_back(*xs);
	++(lane->size);
      }

    void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
      {
	int size = requeue_q.size();
	Lane* lane = get_lane(xcon);
    std::lock_guard<decltype(lane->sp)> lg(lane->sp);
	XioSubmit::Queue::const_iterator i1 = lane->q.end();
	lane->q.splice(i1, requeue_q);
	lane->size += size;
      }

    void deq(XioSubmit::Queue& send_q)
      {
	Lane* lane;
	int cnt;

	for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
      std::lock_guard<decltype(lane->sp)> lg(lane->sp);
	  lane = &qlane[ix];
	  if (lane->size > 0) {
	    XioSubmit::Queue::const_iterator i1 = send_q.end();
	    send_q.splice(i1, lane->q);
	    lane->size = 0;
	    ++ix, ix = ix % nlanes;
	    break;
	  }
	}
      }

  }; /* SubmitQueue */

  Messenger *msgr;
  struct xio_context *ctx;
  struct xio_server *server;
  SubmitQueue submit_q;
  ceph::spinlock sp;
  void *ev_loop;
  string xio_uri;
  char *portal_id;
  bool _shutdown;
  bool drained;
  uint32_t magic;
  uint32_t special_handling;

  friend class XioPortals;
  friend class XioMessenger;

public:
  explicit XioPortal(Messenger *_msgr, int max_conns) :
    msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
    portal_id(NULL), _shutdown(false), drained(false),
    magic(0),
    special_handling(0)
  {
    struct xio_context_params ctx_params;
    memset(&ctx_params, 0, sizeof(ctx_params));
    ctx_params.user_context = this;
    /*
     * hint to Accelio the total number of connections that will share
     * this context's resources: internal primary task pool...
     */
    ctx_params.max_conns_per_ctx = max_conns;

    /* a portal is an xio_context and event loop */
    ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
    ceph_assert(ctx && "Whoops, failed to create portal/ctx");
  }

  int bind(struct xio_session_ops *ops, const string &base_uri,
	   uint16_t port, uint16_t *assigned_port);

  inline void release_xio_msg(XioCompletion* xcmp) {
    struct xio_msg *msg = xcmp->dequeue();
    struct xio_msg *next_msg = NULL;
    int code;
    if (unlikely(!xcmp->xcon->conn)) {
      // NOTE: msg is not safe to dereference if the connection was torn down
      xcmp->xcon->msg_release_fail(msg, ENOTCONN);
    }
    else while (msg) {
      next_msg = static_cast<struct xio_msg *>(msg->user_context);
      code = xio_release_msg(msg);
      if (unlikely(code)) /* very unlikely, so log it */
	xcmp->xcon->msg_release_fail(msg, code);
      msg = next_msg;
    }
    xcmp->trace.event("xio_release_msg");
    xcmp->finalize(); /* unconditional finalize */
  }

  void enqueue(XioConnection *xcon, XioSubmit *xs)
    {
      if (! _shutdown) {
	submit_q.enq(xcon, xs);
	xio_context_stop_loop(ctx);
	return;
      }

      /* dispose xs */
      switch(xs->type) {
      case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
      {
	XioSend* xsend = static_cast<XioSend*>(xs);
	xs->xcon->msg_send_fail(xsend, -EINVAL);
      }
	break;
      default:
	/* INCOMING_MSG_RELEASE */
	release_xio_msg(static_cast<XioCompletion*>(xs));
      break;
      };
    }

  void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
    submit_q.enq(xcon, send_q);
  }

  void requeue_all_xcon(XioConnection* xcon,
			XioSubmit::Queue::iterator& q_iter,
			XioSubmit::Queue& send_q) {
    // XXX gather all already-dequeued outgoing messages for xcon
    // and push them in FIFO order to front of the input queue,
    // and mark the connection as flow-controlled
    XioSubmit::Queue requeue_q;

    while (q_iter != send_q.end()) {
      XioSubmit *xs = &(*q_iter);
      // skip retires and anything for other connections
      if (xs->xcon != xcon) {
	q_iter++;
	continue;
      }
      q_iter = send_q.erase(q_iter);
      requeue_q.push_back(*xs);
    }
    std::lock_guard<decltype(xcon->sp)> lg(xcon->sp);
    XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
    xcon->outgoing.requeue.splice(i1, requeue_q);
    xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
  }

  void *entry()
    {
      int size, code = 0;
      uint32_t xio_qdepth_high;
      XioSubmit::Queue send_q;
      XioSubmit::Queue::iterator q_iter;
      struct xio_msg *msg = NULL;
      XioConnection *xcon;
      XioSubmit *xs;
      XioSend *xsend;

      do {
	submit_q.deq(send_q);

	/* shutdown() barrier */
    std::lock_guard<decltype(sp)> lg(sp);

      restart:
	size = send_q.size();

	if (_shutdown) {
	  // XXX XioSend queues for flow-controlled connections may require
	  // cleanup
	  drained = true;
	}

	if (size > 0) {
	  q_iter = send_q.begin();
	  while (q_iter != send_q.end()) {
	    xs = &(*q_iter);
	    xcon = xs->xcon;

	    switch (xs->type) {
	    case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
	      xsend = static_cast<XioSend*>(xs);
	      if (unlikely(!xcon->conn || !xcon->is_connected()))
		code = ENOTCONN;
	      else {
		/* XXX guard Accelio send queue (should be safe to rely
		 * on Accelio's check on below, but this assures that
		 * all chained xio_msg are accounted) */
		xio_qdepth_high = xcon->xio_qdepth_high_mark();
		if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
			     xio_qdepth_high)) {
		  requeue_all_xcon(xcon, q_iter, send_q);
		  goto restart;
		}

		xs->trace.event("xio_send_msg");
		msg = xsend->get_xio_msg();
		code = xio_send_msg(xcon->conn, msg);
		/* header trace moved here to capture xio serial# */
		if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
		  xsend->print_debug(msgr->cct, "xio_send_msg");
		}
		/* get the right Accelio's errno code */
		if (unlikely(code)) {
		  if ((code == -1) && (xio_errno() == -1)) {
		    /* In case XIO does not have any credits to send,
		     * it would still queue up the message(s) for transmission,
		     * but would return -1 and errno would also be set to -1.
		     * This needs to be treated as a success.
		     */
		    code = 0;
		  }
		  else {
		    code = xio_errno();
		  }
		}
	      } /* !ENOTCONN */
	      if (unlikely(code)) {
		switch (code) {
		case XIO_E_TX_QUEUE_OVERFLOW:
		{
		  requeue_all_xcon(xcon, q_iter, send_q);
		  goto restart;
		}
		  break;
		default:
		  q_iter = send_q.erase(q_iter);
		  xcon->msg_send_fail(xsend, code);
		  continue;
		  break;
		};
	      } else {
		xcon->send.set(msg->timestamp); // need atomic?
		xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
	      }
	      break;
	    default:
	      /* INCOMING_MSG_RELEASE */
	      q_iter = send_q.erase(q_iter);
	      release_xio_msg(static_cast<XioCompletion*>(xs));
	      continue;
	    } /* switch (xs->type) */
	    q_iter = send_q.erase(q_iter);
	  } /* while */
	} /* size > 0 */

	xio_context_run_loop(ctx, 300);

      } while ((!_shutdown) || (!drained));

      /* shutting down */
      if (server) {
	xio_unbind(server);
      }
      xio_context_destroy(ctx);
      return NULL;
    }

  void shutdown()
    {
    std::lock_guard<decltype(sp)> lg(sp);
	_shutdown = true;
    }
};

class XioPortals
{
private:
  vector<XioPortal*> portals;
  char **p_vec;
  int n;
  int last_unused;

public:
  XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
  {
    n = max(_n, 1);

    portals.resize(n);
    for (int i = 0; i < n; i++) {
      if (!portals[i]) {
        portals[i] = new XioPortal(msgr, nconns);
        ceph_assert(portals[i] != nullptr);
      }
    }
  }

  vector<XioPortal*>& get() { return portals; }

  const char **get_vec()
  {
    return (const char **) p_vec;
  }

  int get_portals_len()
  {
    return n;
  }

  int get_last_unused()
  {
    int pix = last_unused;
    if (++last_unused >= get_portals_len())
      last_unused = 0;
    return pix;
  }

  XioPortal* get_next_portal()
  {
    int pix = get_last_unused();
    return portals[pix];
  }

  int bind(struct xio_session_ops *ops, const string& base_uri,
	   uint16_t port, uint16_t *port0);

  int accept(struct xio_session *session,
	     struct xio_new_session_req *req,
	     void *cb_user_context)
  {
    const char **portals_vec = get_vec();
    int pix = get_last_unused();

    if (pix == 0) {
      return xio_accept(session, NULL, 0, NULL, 0);
    } else {
      return xio_accept(session,
			(const char **)&(portals_vec[pix]),
			1, NULL, 0);
    }
  }

  void start()
  {
    XioPortal *portal;
    int p_ix, nportals = portals.size();

    p_vec = new char*[nportals];
    for (p_ix = 0; p_ix < nportals; ++p_ix) {
      portal = portals[p_ix];
      p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
			portal->portal_id;
    }

    for (p_ix = 0; p_ix < nportals; ++p_ix) {
      string thread_name = "ms_xio_";
      thread_name.append(std::to_string(p_ix));
      portal = portals[p_ix];
      portal->create(thread_name.c_str());
    }
  }

  void shutdown()
  {
    int nportals = portals.size();
    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
      XioPortal *portal = portals[p_ix];
      portal->shutdown();
    }
  }

  void join()
  {
    int nportals = portals.size();
    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
      XioPortal *portal = portals[p_ix];
      portal->join();
    }
  }

  ~XioPortals()
  {
    int nportals = portals.size();
    for (int ix = 0; ix < nportals; ++ix)
      delete(portals[ix]);
    portals.clear();
    if (p_vec)
      delete[] p_vec;
  }
};

#endif /* XIO_PORTAL_H */