summaryrefslogtreecommitdiffstats
path: root/src/msg/DispatchQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/DispatchQueue.h')
-rw-r--r--src/msg/DispatchQueue.h243
1 files changed, 243 insertions, 0 deletions
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
new file mode 100644
index 00000000..2d90d82c
--- /dev/null
+++ b/src/msg/DispatchQueue.h
@@ -0,0 +1,243 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_DISPATCHQUEUE_H
+#define CEPH_DISPATCHQUEUE_H
+
+#include <atomic>
+#include <map>
+#include <queue>
+#include <boost/intrusive_ptr.hpp>
+#include "include/ceph_assert.h"
+#include "common/Throttle.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+#include "common/PrioritizedQueue.h"
+
+#include "Message.h"
+
+class CephContext;
+class Messenger;
+struct Connection;
+
+/**
+ * The DispatchQueue contains all the connections which have Messages
+ * they want to be dispatched, carefully organized by Message priority
+ * and permitted to deliver in a round-robin fashion.
+ * See Messenger::dispatch_entry for details.
+ */
+class DispatchQueue {
+ class QueueItem {
+ int type;
+ ConnectionRef con;
+ Message::ref m;
+ public:
+ explicit QueueItem(const Message::ref& m) : type(-1), con(0), m(m) {}
+ QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
+ bool is_code() const {
+ return type != -1;
+ }
+ int get_code () const {
+ ceph_assert(is_code());
+ return type;
+ }
+ const Message::ref& get_message() {
+ ceph_assert(!is_code());
+ return m;
+ }
+ Connection *get_connection() {
+ ceph_assert(is_code());
+ return con.get();
+ }
+ };
+
+ CephContext *cct;
+ Messenger *msgr;
+ mutable Mutex lock;
+ Cond cond;
+
+ PrioritizedQueue<QueueItem, uint64_t> mqueue;
+
+ std::set<pair<double, Message::ref>> marrival;
+ map<Message::ref, decltype(marrival)::iterator> marrival_map;
+ void add_arrival(const Message::ref& m) {
+ marrival_map.insert(
+ make_pair(
+ m,
+ marrival.insert(make_pair(m->get_recv_stamp(), m)).first
+ )
+ );
+ }
+ void remove_arrival(const Message::ref& m) {
+ auto it = marrival_map.find(m);
+ ceph_assert(it != marrival_map.end());
+ marrival.erase(it->second);
+ marrival_map.erase(it);
+ }
+
+ std::atomic<uint64_t> next_id;
+
+ enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
+
+ /**
+ * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
+ */
+ class DispatchThread : public Thread {
+ DispatchQueue *dq;
+ public:
+ explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
+ void *entry() override {
+ dq->entry();
+ return 0;
+ }
+ } dispatch_thread;
+
+ Mutex local_delivery_lock;
+ Cond local_delivery_cond;
+ bool stop_local_delivery;
+ std::queue<pair<Message::ref, int>> local_messages;
+ class LocalDeliveryThread : public Thread {
+ DispatchQueue *dq;
+ public:
+ explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
+ void *entry() override {
+ dq->run_local_delivery();
+ return 0;
+ }
+ } local_delivery_thread;
+
+ uint64_t pre_dispatch(const Message::ref& m);
+ void post_dispatch(const Message::ref& m, uint64_t msize);
+
+ public:
+
+ /// Throttle preventing us from building up a big backlog waiting for dispatch
+ Throttle dispatch_throttler;
+
+ bool stop;
+ void local_delivery(const Message::ref& m, int priority);
+ void local_delivery(Message* m, int priority) {
+ return local_delivery(Message::ref(m, false), priority); /* consume ref */
+ }
+ void run_local_delivery();
+
+ double get_max_age(utime_t now) const;
+
+ int get_queue_len() const {
+ Mutex::Locker l(lock);
+ return mqueue.length();
+ }
+
+ /**
+ * Release memory accounting back to the dispatch throttler.
+ *
+ * @param msize The amount of memory to release.
+ */
+ void dispatch_throttle_release(uint64_t msize);
+
+ void queue_connect(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_CONNECT, con));
+ cond.Signal();
+ }
+ void queue_accept(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_ACCEPT, con));
+ cond.Signal();
+ }
+ void queue_remote_reset(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_BAD_REMOTE_RESET, con));
+ cond.Signal();
+ }
+ void queue_reset(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_BAD_RESET, con));
+ cond.Signal();
+ }
+ void queue_refused(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_CONN_REFUSED, con));
+ cond.Signal();
+ }
+
+ bool can_fast_dispatch(const Message::const_ref &m) const;
+ void fast_dispatch(const Message::ref& m);
+ void fast_dispatch(Message* m) {
+ return fast_dispatch(Message::ref(m, false)); /* consume ref */
+ }
+ void fast_preprocess(const Message::ref& m);
+ void enqueue(const Message::ref& m, int priority, uint64_t id);
+ void enqueue(Message* m, int priority, uint64_t id) {
+ return enqueue(Message::ref(m, false), priority, id); /* consume ref */
+ }
+ void discard_queue(uint64_t id);
+ void discard_local();
+ uint64_t get_id() {
+ return next_id++;
+ }
+ void start();
+ void entry();
+ void wait();
+ void shutdown();
+ bool is_started() const {return dispatch_thread.is_started();}
+
+ DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
+ : cct(cct), msgr(msgr),
+ lock("Messenger::DispatchQueue::lock" + name),
+ mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
+ cct->_conf->ms_pq_min_cost),
+ next_id(1),
+ dispatch_thread(this),
+ local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name),
+ stop_local_delivery(false),
+ local_delivery_thread(this),
+ dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
+ cct->_conf->ms_dispatch_throttle_bytes),
+ stop(false)
+ {}
+ ~DispatchQueue() {
+ ceph_assert(mqueue.empty());
+ ceph_assert(marrival.empty());
+ ceph_assert(local_messages.empty());
+ }
+};
+
+#endif