summaryrefslogtreecommitdiffstats
path: root/src/msg/async/Event.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/async/Event.h')
-rw-r--r--src/msg/async/Event.h266
1 files changed, 266 insertions, 0 deletions
diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h
new file mode 100644
index 00000000..6736060e
--- /dev/null
+++ b/src/msg/async/Event.h
@@ -0,0 +1,266 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_EVENT_H
+#define CEPH_MSG_EVENT_H
+
+#ifdef __APPLE__
+#include <AvailabilityMacros.h>
+#endif
+
+// We use epoll, kqueue, evport, select in descending order by performance.
+#if defined(__linux__)
+#define HAVE_EPOLL 1
+#endif
+
+#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
+#define HAVE_KQUEUE 1
+#endif
+
+#ifdef __sun
+#include <sys/feature_tests.h>
+#ifdef _DTRACE_VERSION
+#define HAVE_EVPORT 1
+#endif
+#endif
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+#include "common/ceph_time.h"
+#include "common/dout.h"
+#include "net_handler.h"
+
+#define EVENT_NONE 0
+#define EVENT_READABLE 1
+#define EVENT_WRITABLE 2
+
+class EventCenter;
+
+class EventCallback {
+
+ public:
+ virtual void do_request(uint64_t fd_or_id) = 0;
+ virtual ~EventCallback() {} // we want a virtual destructor!!!
+};
+
+typedef EventCallback* EventCallbackRef;
+
+struct FiredFileEvent {
+ int fd;
+ int mask;
+};
+
+/*
+ * EventDriver is a wrap of event mechanisms depends on different OS.
+ * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
+ * be used for worst condition.
+ */
+class EventDriver {
+ public:
+ virtual ~EventDriver() {} // we want a virtual destructor!!!
+ virtual int init(EventCenter *center, int nevent) = 0;
+ virtual int add_event(int fd, int cur_mask, int mask) = 0;
+ virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
+ virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
+ virtual int resize_events(int newsize) = 0;
+ virtual bool need_wakeup() { return true; }
+};
+
+/*
+ * EventCenter maintain a set of file descriptor and handle registered events.
+ */
+class EventCenter {
+ public:
+ // should be enough;
+ static const int MAX_EVENTCENTER = 24;
+
+ private:
+ using clock_type = ceph::coarse_mono_clock;
+
+ struct AssociatedCenters {
+ EventCenter *centers[MAX_EVENTCENTER];
+ AssociatedCenters() {
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
+ }
+ };
+
+ struct FileEvent {
+ int mask;
+ EventCallbackRef read_cb;
+ EventCallbackRef write_cb;
+ FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
+ };
+
+ struct TimeEvent {
+ uint64_t id;
+ EventCallbackRef time_cb;
+
+ TimeEvent(): id(0), time_cb(NULL) {}
+ };
+
+ public:
+ /**
+ * A Poller object is invoked once each time through the dispatcher's
+ * inner polling loop.
+ */
+ class Poller {
+ public:
+ explicit Poller(EventCenter* center, const string& pollerName);
+ virtual ~Poller();
+
+ /**
+ * This method is defined by a subclass and invoked once by the
+ * center during each pass through its inner polling loop.
+ *
+ * \return
+ * 1 means that this poller did useful work during this call.
+ * 0 means that the poller found no work to do.
+ */
+ virtual int poll() = 0;
+
+ private:
+ /// The EventCenter object that owns this Poller. NULL means the
+ /// EventCenter has been deleted.
+ EventCenter* owner;
+
+ /// Human-readable string name given to the poller to make it
+ /// easy to identify for debugging. For most pollers just passing
+ /// in the subclass name probably makes sense.
+ string poller_name;
+
+ /// Index of this Poller in EventCenter::pollers. Allows deletion
+ /// without having to scan all the entries in pollers. -1 means
+ /// this poller isn't currently in EventCenter::pollers (happens
+ /// after EventCenter::reset).
+ int slot;
+ };
+
+ private:
+ CephContext *cct;
+ std::string type;
+ int nevent;
+ // Used only to external event
+ pthread_t owner = 0;
+ std::mutex external_lock;
+ std::atomic_ulong external_num_events;
+ deque<EventCallbackRef> external_events;
+ vector<FileEvent> file_events;
+ EventDriver *driver;
+ std::multimap<clock_type::time_point, TimeEvent> time_events;
+ // Keeps track of all of the pollers currently defined. We don't
+ // use an intrusive list here because it isn't reentrant: we need
+ // to add/remove elements while the center is traversing the list.
+ std::vector<Poller*> pollers;
+ std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
+ uint64_t time_event_next_id;
+ int notify_receive_fd;
+ int notify_send_fd;
+ NetHandler net;
+ EventCallbackRef notify_handler;
+ unsigned idx;
+ AssociatedCenters *global_centers = nullptr;
+
+ int process_time_events();
+ FileEvent *_get_file_event(int fd) {
+ ceph_assert(fd < nevent);
+ return &file_events[fd];
+ }
+
+ public:
+ explicit EventCenter(CephContext *c):
+ cct(c), nevent(0),
+ external_num_events(0),
+ driver(NULL), time_event_next_id(1),
+ notify_receive_fd(-1), notify_send_fd(-1), net(c),
+ notify_handler(NULL), idx(0) { }
+ ~EventCenter();
+ ostream& _event_prefix(std::ostream *_dout);
+
+ int init(int nevent, unsigned idx, const std::string &t);
+ void set_owner();
+ pthread_t get_owner() const { return owner; }
+ unsigned get_id() const { return idx; }
+
+ EventDriver *get_driver() { return driver; }
+
+ // Used by internal thread
+ int create_file_event(int fd, int mask, EventCallbackRef ctxt);
+ uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
+ void delete_file_event(int fd, int mask);
+ void delete_time_event(uint64_t id);
+ int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr);
+ void wakeup();
+
+ // Used by external thread
+ void dispatch_event_external(EventCallbackRef e);
+ inline bool in_thread() const {
+ return pthread_equal(pthread_self(), owner);
+ }
+
+ private:
+ template <typename func>
+ class C_submit_event : public EventCallback {
+ std::mutex lock;
+ std::condition_variable cond;
+ bool done = false;
+ func f;
+ bool nonwait;
+ public:
+ C_submit_event(func &&_f, bool nw)
+ : f(std::move(_f)), nonwait(nw) {}
+ void do_request(uint64_t id) override {
+ f();
+ lock.lock();
+ cond.notify_all();
+ done = true;
+ bool del = nonwait;
+ lock.unlock();
+ if (del)
+ delete this;
+ }
+ void wait() {
+ ceph_assert(!nonwait);
+ std::unique_lock<std::mutex> l(lock);
+ while (!done)
+ cond.wait(l);
+ }
+ };
+
+ public:
+ template <typename func>
+ void submit_to(int i, func &&f, bool nowait = false) {
+ ceph_assert(i < MAX_EVENTCENTER && global_centers);
+ EventCenter *c = global_centers->centers[i];
+ ceph_assert(c);
+ if (!nowait && c->in_thread()) {
+ f();
+ return ;
+ }
+ if (nowait) {
+ C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
+ c->dispatch_event_external(event);
+ } else {
+ C_submit_event<func> event(std::move(f), false);
+ c->dispatch_event_external(&event);
+ event.wait();
+ }
+ };
+};
+
+#endif